/proc/self/cwd/source/common/tcp/conn_pool.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <list> |
4 | | #include <memory> |
5 | | |
6 | | #include "envoy/event/deferred_deletable.h" |
7 | | #include "envoy/event/timer.h" |
8 | | #include "envoy/network/connection.h" |
9 | | #include "envoy/network/filter.h" |
10 | | #include "envoy/stats/timespan.h" |
11 | | #include "envoy/tcp/conn_pool.h" |
12 | | #include "envoy/upstream/upstream.h" |
13 | | |
14 | | #include "source/common/common/linked_object.h" |
15 | | #include "source/common/common/logger.h" |
16 | | #include "source/common/http/conn_pool_base.h" |
17 | | #include "source/common/network/filter_impl.h" |
18 | | #include "source/common/runtime/runtime_features.h" |
19 | | |
20 | | namespace Envoy { |
21 | | namespace Tcp { |
22 | | |
23 | | class ConnPoolImpl; |
24 | | |
25 | | struct TcpAttachContext : public Envoy::ConnectionPool::AttachContext { |
26 | 0 | TcpAttachContext(Tcp::ConnectionPool::Callbacks* callbacks) : callbacks_(callbacks) {} |
27 | | Tcp::ConnectionPool::Callbacks* callbacks_; |
28 | | }; |
29 | | |
30 | | class TcpPendingStream : public Envoy::ConnectionPool::PendingStream { |
31 | | public: |
32 | | TcpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, bool can_send_early_data, |
33 | | TcpAttachContext& context) |
34 | 0 | : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data), context_(context) {} |
35 | 0 | Envoy::ConnectionPool::AttachContext& context() override { return context_; } |
36 | | |
37 | | TcpAttachContext context_; |
38 | | }; |
39 | | |
40 | | class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient { |
41 | | public: |
42 | | struct ConnReadFilter : public Network::ReadFilterBaseImpl { |
43 | 0 | ConnReadFilter(ActiveTcpClient& parent) : parent_(parent) {} |
44 | | |
45 | | // Network::ReadFilter |
46 | 0 | Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { |
47 | 0 | parent_.onUpstreamData(data, end_stream); |
48 | 0 | return Network::FilterStatus::StopIteration; |
49 | 0 | } |
50 | | ActiveTcpClient& parent_; |
51 | | }; |
52 | | |
53 | | // This acts as the bridge between the ActiveTcpClient and an individual TCP connection. |
54 | | class TcpConnectionData : public Envoy::Tcp::ConnectionPool::ConnectionData { |
55 | | public: |
56 | | TcpConnectionData(ActiveTcpClient& parent, Network::ClientConnection& connection) |
57 | 0 | : parent_(&parent), connection_(connection) { |
58 | 0 | parent_->tcp_connection_data_ = this; |
59 | 0 | } |
60 | 0 | ~TcpConnectionData() override { |
61 | | // Generally it is the case that TcpConnectionData will be destroyed before the |
62 | | // ActiveTcpClient. Because ordering on the deferred delete list is not guaranteed in the |
63 | | // case of a disconnect, make sure parent_ is valid before doing clean-up. |
64 | 0 | if (parent_) { |
65 | 0 | parent_->clearCallbacks(); |
66 | 0 | } |
67 | 0 | } |
68 | | |
69 | 0 | Network::ClientConnection& connection() override { return connection_; } |
70 | 0 | void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override { |
71 | 0 | parent_->connection_state_ = std::move(state); |
72 | 0 | } |
73 | | |
74 | 0 | void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override { |
75 | 0 | parent_->callbacks_ = &callbacks; |
76 | 0 | } |
77 | 0 | void release() { parent_ = nullptr; } |
78 | | |
79 | | protected: |
80 | 0 | ConnectionPool::ConnectionState* connectionState() override { |
81 | 0 | return parent_->connection_state_.get(); |
82 | 0 | } |
83 | | |
84 | | private: |
85 | | ActiveTcpClient* parent_; |
86 | | Network::ClientConnection& connection_; |
87 | | }; |
88 | | |
89 | | ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent, |
90 | | const Upstream::HostConstSharedPtr& host, uint64_t concurrent_stream_limit, |
91 | | absl::optional<std::chrono::milliseconds> idle_timeout); |
92 | | ~ActiveTcpClient() override; |
93 | | |
94 | | // Override the default's of Envoy::ConnectionPool::ActiveClient for class-specific functions. |
95 | | // Network::ConnectionCallbacks |
96 | | void onEvent(Network::ConnectionEvent event) override; |
97 | 0 | void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); } |
98 | 0 | void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); } |
99 | | |
100 | | // Undo the readDisable done in onEvent(Connected) - now that there is an associated connection, |
101 | | // drain any data. |
102 | 0 | void readEnableIfNew() { |
103 | | // It is expected for Envoy use of ActiveTcpClient this function only be |
104 | | // called once. Other users of the TcpConnPool may recycle Tcp connections, |
105 | | // and this safeguards them against read-enabling too many times. |
106 | 0 | if (!associated_before_) { |
107 | 0 | associated_before_ = true; |
108 | 0 | connection_->readDisable(false); |
109 | | // Also while we're at it, make sure the connection will proxy all TCP |
110 | | // data before picking up a FIN. |
111 | 0 | connection_->detectEarlyCloseWhenReadDisabled(false); |
112 | 0 | } |
113 | 0 | } |
114 | | |
115 | 0 | void initializeReadFilters() override { connection_->initializeReadFilters(); } |
116 | 0 | absl::optional<Http::Protocol> protocol() const override { return {}; } |
117 | | void close() override; |
118 | 0 | uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; } |
119 | 0 | bool closingWithIncompleteStream() const override { return false; } |
120 | 0 | uint64_t id() const override { return connection_->id(); } |
121 | | |
122 | 0 | void onUpstreamData(Buffer::Instance& data, bool end_stream) { |
123 | 0 | if (callbacks_) { |
124 | 0 | callbacks_->onUpstreamData(data, end_stream); |
125 | 0 | } else { |
126 | 0 | close(); |
127 | 0 | } |
128 | 0 | } |
129 | | virtual void clearCallbacks(); |
130 | | |
131 | | // Called if the underlying connection is idle over the cluster's tcpPoolIdleTimeout() |
132 | | void onIdleTimeout(); |
133 | | void disableIdleTimer(); |
134 | | void setIdleTimer(); |
135 | | |
136 | | std::shared_ptr<ConnReadFilter> read_filter_handle_; |
137 | | Envoy::ConnectionPool::ConnPoolImplBase& parent_; |
138 | | ConnectionPool::UpstreamCallbacks* callbacks_{}; |
139 | | Network::ClientConnectionPtr connection_; |
140 | | ConnectionPool::ConnectionStatePtr connection_state_; |
141 | | TcpConnectionData* tcp_connection_data_{}; |
142 | | bool associated_before_{}; |
143 | | absl::optional<std::chrono::milliseconds> idle_timeout_; |
144 | | Event::TimerPtr idle_timer_; |
145 | | }; |
146 | | |
147 | | class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase, |
148 | | public Tcp::ConnectionPool::Instance { |
149 | | public: |
150 | | ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host, |
151 | | Upstream::ResourcePriority priority, |
152 | | const Network::ConnectionSocket::OptionsSharedPtr& options, |
153 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
154 | | Upstream::ClusterConnectivityState& state, |
155 | | absl::optional<std::chrono::milliseconds> idle_timeout) |
156 | | : Envoy::ConnectionPool::ConnPoolImplBase(host, priority, dispatcher, options, |
157 | | transport_socket_options, state), |
158 | 0 | idle_timeout_(idle_timeout) {} |
159 | 0 | ~ConnPoolImpl() override { destructAllConnections(); } |
160 | | |
161 | | // Event::DeferredDeletable |
162 | 0 | void deleteIsPending() override { deleteIsPendingImpl(); } |
163 | | |
164 | 0 | void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } |
165 | 0 | bool isIdle() const override { return isIdleImpl(); } |
166 | 0 | void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override { |
167 | 0 | drainConnectionsImpl(drain_behavior); |
168 | 0 | if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { |
169 | 0 | return; |
170 | 0 | } |
171 | | // Legacy behavior for the TCP connection pool marks all connecting clients |
172 | | // as draining. |
173 | 0 | for (auto& connecting_client : connecting_clients_) { |
174 | 0 | if (connecting_client->remaining_streams_ > 1) { |
175 | 0 | uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit(); |
176 | 0 | connecting_client->remaining_streams_ = 1; |
177 | 0 | if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) { |
178 | 0 | decrConnectingAndConnectedStreamCapacity( |
179 | 0 | old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client); |
180 | 0 | } |
181 | 0 | } |
182 | 0 | } |
183 | 0 | } |
184 | | |
185 | 0 | void closeConnections() override { |
186 | 0 | for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) { |
187 | 0 | while (!list->empty()) { |
188 | 0 | list->front()->close(); |
189 | 0 | } |
190 | 0 | } |
191 | 0 | } |
192 | 0 | ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override { |
193 | 0 | TcpAttachContext context(&callbacks); |
194 | | // TLS early data over TCP is not supported yet. |
195 | 0 | return newStreamImpl(context, /*can_send_early_data=*/false); |
196 | 0 | } |
197 | 0 | bool maybePreconnect(float preconnect_ratio) override { |
198 | 0 | return maybePreconnectImpl(preconnect_ratio); |
199 | 0 | } |
200 | | |
201 | | ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context, |
202 | 0 | bool can_send_early_data) override { |
203 | 0 | Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>( |
204 | 0 | *this, can_send_early_data, typedContext<TcpAttachContext>(context)); |
205 | 0 | return addPendingStream(std::move(pending_stream)); |
206 | 0 | } |
207 | | |
208 | 0 | Upstream::HostDescriptionConstSharedPtr host() const override { |
209 | 0 | return Envoy::ConnectionPool::ConnPoolImplBase::host(); |
210 | 0 | } |
211 | | |
212 | 0 | Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override { |
213 | 0 | return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(), |
214 | 0 | 1, idle_timeout_); |
215 | 0 | } |
216 | | |
217 | | void onPoolReady(Envoy::ConnectionPool::ActiveClient& client, |
218 | 0 | Envoy::ConnectionPool::AttachContext& context) override { |
219 | 0 | ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client); |
220 | 0 | tcp_client->readEnableIfNew(); |
221 | 0 | auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_; |
222 | 0 | std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data = |
223 | 0 | std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_); |
224 | 0 | callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_); |
225 | | |
226 | | // The tcp client is taken over. Stop the idle timer. |
227 | 0 | if (!connection_data) { |
228 | 0 | tcp_client->disableIdleTimer(); |
229 | 0 | } |
230 | 0 | } |
231 | | |
232 | | void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description, |
233 | | absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason, |
234 | 0 | Envoy::ConnectionPool::AttachContext& context) override { |
235 | 0 | auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_; |
236 | 0 | callbacks->onPoolFailure(reason, failure_reason, host_description); |
237 | 0 | } |
238 | | |
239 | 0 | bool enforceMaxRequests() const override { return false; } |
240 | | // These two functions exist for testing parity between old and new Tcp Connection Pools. |
241 | 0 | virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {} |
242 | 0 | virtual void onConnDestroyed() {} |
243 | | |
244 | | absl::optional<std::chrono::milliseconds> idle_timeout_; |
245 | | }; |
246 | | |
247 | | } // namespace Tcp |
248 | | } // namespace Envoy |