1
#pragma once
2

            
3
#include <list>
4
#include <memory>
5

            
6
#include "envoy/event/deferred_deletable.h"
7
#include "envoy/event/timer.h"
8
#include "envoy/network/connection.h"
9
#include "envoy/network/filter.h"
10
#include "envoy/server/overload/overload_manager.h"
11
#include "envoy/stats/timespan.h"
12
#include "envoy/tcp/conn_pool.h"
13
#include "envoy/upstream/upstream.h"
14

            
15
#include "source/common/common/linked_object.h"
16
#include "source/common/common/logger.h"
17
#include "source/common/http/conn_pool_base.h"
18
#include "source/common/network/filter_impl.h"
19
#include "source/common/runtime/runtime_features.h"
20

            
21
namespace Envoy {
22
namespace Tcp {
23

            
24
class ConnPoolImpl;
25

            
26
struct TcpAttachContext : public Envoy::ConnectionPool::AttachContext {
27
2243
  TcpAttachContext(Tcp::ConnectionPool::Callbacks* callbacks) : callbacks_(callbacks) {}
28
  Tcp::ConnectionPool::Callbacks* callbacks_;
29
};
30

            
31
class TcpPendingStream : public Envoy::ConnectionPool::PendingStream {
32
public:
33
  TcpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, bool can_send_early_data,
34
                   TcpAttachContext& context)
35
2218
      : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data), context_(context) {}
36
2033
  Envoy::ConnectionPool::AttachContext& context() override { return context_; }
37

            
38
  TcpAttachContext context_;
39
};
40

            
41
class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
42
public:
43
  struct ConnReadFilter : public Network::ReadFilterBaseImpl {
44
2167
    ConnReadFilter(ActiveTcpClient& parent) : parent_(parent) {}
45

            
46
    // Network::ReadFilter
47
2969
    Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
48
2969
      parent_.onUpstreamData(data, end_stream);
49
2969
      return Network::FilterStatus::StopIteration;
50
2969
    }
51
    ActiveTcpClient& parent_;
52
  };
53

            
54
  // This acts as the bridge between the ActiveTcpClient and an individual TCP connection.
55
  class TcpConnectionData : public Envoy::Tcp::ConnectionPool::ConnectionData {
56
  public:
57
    TcpConnectionData(ActiveTcpClient& parent, Network::ClientConnection& connection)
58
2042
        : parent_(&parent), connection_(connection) {
59
2042
      parent_->tcp_connection_data_ = this;
60
2042
    }
61
2042
    ~TcpConnectionData() override {
62
      // Generally it is the case that TcpConnectionData will be destroyed before the
63
      // ActiveTcpClient. Because ordering on the deferred delete list is not guaranteed in the
64
      // case of a disconnect, make sure parent_ is valid before doing clean-up.
65
2042
      if (parent_) {
66
2033
        parent_->clearCallbacks();
67
2033
      }
68
2042
    }
69

            
70
17315
    Network::ClientConnection& connection() override { return connection_; }
71
270
    void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
72
270
      parent_->connection_state_ = std::move(state);
73
270
    }
74

            
75
2042
    void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
76
2042
      parent_->callbacks_ = &callbacks;
77
2042
    }
78
9
    void release() { parent_ = nullptr; }
79

            
80
  protected:
81
536
    ConnectionPool::ConnectionState* connectionState() override {
82
536
      return parent_->connection_state_.get();
83
536
    }
84

            
85
  private:
86
    ActiveTcpClient* parent_;
87
    Network::ClientConnection& connection_;
88
  };
89

            
90
  ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
91
                  const Upstream::HostConstSharedPtr& host, uint32_t concurrent_stream_limit,
92
                  absl::optional<std::chrono::milliseconds> idle_timeout);
93
  ~ActiveTcpClient() override;
94

            
95
  // Override the default's of Envoy::ConnectionPool::ActiveClient for class-specific functions.
96
  // Network::ConnectionCallbacks
97
  void onEvent(Network::ConnectionEvent event) override;
98
14
  void onAboveWriteBufferHighWatermark() override {
99
14
    if (callbacks_) {
100
14
      callbacks_->onAboveWriteBufferHighWatermark();
101
14
    }
102
14
  }
103
14
  void onBelowWriteBufferLowWatermark() override {
104
14
    if (callbacks_) {
105
14
      callbacks_->onBelowWriteBufferLowWatermark();
106
14
    }
107
14
  }
108

            
109
  // Undos the readDisable done in onEvent(Connected)
110
  void readEnableIfNew();
111

            
112
2138
  void initializeReadFilters() override { connection_->initializeReadFilters(); }
113
59
  absl::optional<Http::Protocol> protocol() const override { return {}; }
114
  void
115
  close(Envoy::Network::ConnectionCloseType type = Envoy::Network::ConnectionCloseType::NoFlush,
116
        absl::string_view details = "") override;
117
19269
  uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; }
118
2108
  bool closingWithIncompleteStream() const override { return false; }
119
41
  uint64_t id() const override { return connection_->id(); }
120

            
121
2969
  void onUpstreamData(Buffer::Instance& data, bool end_stream) {
122
2969
    if (callbacks_) {
123
2969
      callbacks_->onUpstreamData(data, end_stream);
124
2969
    } else {
125
      close();
126
    }
127
2969
  }
128
  virtual void clearCallbacks();
129

            
130
  // Called if the underlying connection is idle over the cluster's tcpPoolIdleTimeout()
131
  void onIdleTimeout();
132
  void disableIdleTimer();
133
  void setIdleTimer();
134

            
135
  std::shared_ptr<ConnReadFilter> read_filter_handle_;
136
  Envoy::ConnectionPool::ConnPoolImplBase& parent_;
137
  ConnectionPool::UpstreamCallbacks* callbacks_{};
138
  Network::ClientConnectionPtr connection_;
139
  ConnectionPool::ConnectionStatePtr connection_state_;
140
  TcpConnectionData* tcp_connection_data_{};
141
  bool associated_before_{};
142
  absl::optional<std::chrono::milliseconds> idle_timeout_;
143
  Event::TimerPtr idle_timer_;
144
};
145

            
146
class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
147
                     public Tcp::ConnectionPool::Instance {
148
public:
149
  ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
150
               Upstream::ResourcePriority priority,
151
               const Network::ConnectionSocket::OptionsSharedPtr& options,
152
               Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
153
               Upstream::ClusterConnectivityState& state,
154
               absl::optional<std::chrono::milliseconds> idle_timeout,
155
               Server::OverloadManager& overload_manager)
156
798
      : Envoy::ConnectionPool::ConnPoolImplBase(host, priority, dispatcher, options,
157
798
                                                transport_socket_options, state, overload_manager),
158
798
        idle_timeout_(idle_timeout) {}
159
798
  ~ConnPoolImpl() override { destructAllConnections(); }
160

            
161
  // Event::DeferredDeletable
162
762
  void deleteIsPending() override { deleteIsPendingImpl(); }
163

            
164
771
  void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
165
5
  bool isIdle() const override { return isIdleImpl(); }
166
  void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override;
167
  void closeConnections() override;
168
  ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override;
169
33
  bool maybePreconnect(float preconnect_ratio) override {
170
33
    return maybePreconnectImpl(preconnect_ratio);
171
33
  }
172
  ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context,
173
                                                bool can_send_early_data) override;
174
756
  Upstream::HostDescriptionConstSharedPtr host() const override {
175
756
    return Envoy::ConnectionPool::ConnPoolImplBase::host();
176
756
  }
177
  Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;
178
  void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
179
                   Envoy::ConnectionPool::AttachContext& context) override;
180
  void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
181
                     absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason,
182
                     Envoy::ConnectionPool::AttachContext& context) override;
183
2042
  bool enforceMaxRequests() const override { return false; }
184
  // These two functions exist for testing parity between old and new Tcp Connection Pools.
185
24
  virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {}
186
4
  virtual void onConnDestroyed() {}
187

            
188
  absl::optional<std::chrono::milliseconds> idle_timeout_;
189
};
190

            
191
} // namespace Tcp
192
} // namespace Envoy