Line data Source code
1 : #pragma once
2 :
3 : #include "envoy/common/conn_pool.h"
4 : #include "envoy/event/dispatcher.h"
5 : #include "envoy/network/connection.h"
6 : #include "envoy/stats/timespan.h"
7 : #include "envoy/upstream/cluster_manager.h"
8 :
9 : #include "source/common/common/debug_recursion_checker.h"
10 : #include "source/common/common/dump_state_utils.h"
11 : #include "source/common/common/linked_object.h"
12 :
13 : #include "absl/strings/string_view.h"
14 : #include "fmt/ostream.h"
15 :
16 : namespace Envoy {
17 : namespace ConnectionPool {
18 :
19 : class ConnPoolImplBase;
20 :
21 : // A placeholder struct for whatever data a given connection pool needs to
22 : // successfully attach an upstream connection to a downstream connection.
23 : struct AttachContext {
24 : // Add a virtual destructor to allow for the dynamic_cast ASSERT in typedContext.
25 424 : virtual ~AttachContext() = default;
26 : };
27 :
28 : // ActiveClient provides a base class for connection pool clients that handles connection timings
29 : // as well as managing the connection timeout.
30 : class ActiveClient : public LinkedObject<ActiveClient>,
31 : public Network::ConnectionCallbacks,
32 : public Event::DeferredDeletable,
33 : protected Logger::Loggable<Logger::Id::pool> {
34 : public:
35 : ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
36 : uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit);
37 : ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
38 : uint32_t concurrent_stream_limit);
39 : ~ActiveClient() override;
40 :
41 173 : virtual void releaseResources() { releaseResourcesBase(); }
42 : void releaseResourcesBase();
43 :
44 : // Network::ConnectionCallbacks
45 0 : void onAboveWriteBufferHighWatermark() override {}
46 0 : void onBelowWriteBufferLowWatermark() override {}
47 :
48 : // Called if the connection does not complete within the cluster's connectTimeout()
49 : void onConnectTimeout();
50 :
51 : // Called if the maximum connection duration is reached.
52 : void onConnectionDurationTimeout();
53 :
54 : // Returns the concurrent stream limit, accounting for if the total stream limit
55 : // is less than the concurrent stream limit.
56 0 : virtual uint32_t effectiveConcurrentStreamLimit() const {
57 0 : return std::min(remaining_streams_, concurrent_stream_limit_);
58 0 : }
59 :
60 : // Returns the application protocol, or absl::nullopt for TCP.
61 : virtual absl::optional<Http::Protocol> protocol() const PURE;
62 :
63 1025 : virtual int64_t currentUnusedCapacity() const {
64 1025 : int64_t remaining_concurrent_streams =
65 1025 : static_cast<int64_t>(concurrent_stream_limit_) - numActiveStreams();
66 :
67 1025 : return std::min<int64_t>(remaining_streams_, remaining_concurrent_streams);
68 1025 : }
69 :
70 : // Initialize upstream read filters. Called when connected.
71 : virtual void initializeReadFilters() PURE;
72 :
73 : // Closes the underlying connection.
74 : virtual void close() PURE;
75 : // Returns the ID of the underlying connection.
76 : virtual uint64_t id() const PURE;
77 : // Returns true if this closed with an incomplete stream, for stats tracking/ purposes.
78 : virtual bool closingWithIncompleteStream() const PURE;
79 : // Returns the number of active streams on this connection.
80 : virtual uint32_t numActiveStreams() const PURE;
81 :
82 : // Return true if it is ready to dispatch the next stream.
83 165 : virtual bool readyForStream() const {
84 165 : ASSERT(!supportsEarlyData());
85 165 : return state_ == State::Ready;
86 165 : }
87 :
88 : // This function is called onStreamClosed to see if there was a negative delta
89 : // and (if necessary) update associated bookkeeping.
90 : // HTTP/1 and TCP pools can not have negative delta so the default implementation simply returns
91 : // false. The HTTP/2 connection pool can have this state, so overrides this function.
92 0 : virtual bool hadNegativeDeltaOnStreamClosed() { return false; }
93 :
94 : enum class State {
95 : Connecting, // Connection is not yet established.
96 : ReadyForEarlyData, // Any additional early data stream can be immediately dispatched to this
97 : // connection.
98 : Ready, // Additional streams may be immediately dispatched to this connection.
99 : Busy, // Connection is at its concurrent stream limit.
100 : Draining, // No more streams can be dispatched to this connection, and it will be
101 : // closed when all streams complete.
102 : Closed // Connection is closed and object is queued for destruction.
103 : };
104 :
105 1450 : State state() const { return state_; }
106 :
107 425 : void setState(State state) {
108 425 : if (state == State::ReadyForEarlyData && !supportsEarlyData()) {
109 0 : IS_ENVOY_BUG("Unable to set state to ReadyForEarlyData in a client which does not support "
110 0 : "early data.");
111 0 : return;
112 0 : }
113 : // If the client is transitioning to draining, update the remaining
114 : // streams and pool and cluster capacity.
115 425 : if (state == State::Draining) {
116 0 : drain();
117 0 : }
118 425 : state_ = state;
119 425 : }
120 :
121 : // Sets the remaining streams to 0, and updates pool and cluster capacity.
122 : virtual void drain();
123 :
124 787 : virtual bool hasHandshakeCompleted() const {
125 787 : ASSERT(!supportsEarlyData());
126 787 : return state_ != State::Connecting;
127 787 : }
128 :
129 : ConnPoolImplBase& parent_;
130 : // The count of remaining streams allowed for this connection.
131 : // This will start out as the total number of streams per connection if capped
132 : // by configuration, or it will be set to std::numeric_limits<uint32_t>::max() to be
133 : // (functionally) unlimited.
134 : // TODO: this could be moved to an optional to make it actually unlimited.
135 : uint32_t remaining_streams_;
136 : // The will start out as the upper limit of max concurrent streams for this connection
137 : // if capped by configuration, or it will be set to std::numeric_limits<uint32_t>::max()
138 : // to be (functionally) unlimited.
139 : uint32_t configured_stream_limit_;
140 : // The max concurrent stream for this connection, it's initialized by `configured_stream_limit_`
141 : // and can be adjusted by SETTINGS frame, but the max value of it can't exceed
142 : // `configured_stream_limit_`.
143 : uint32_t concurrent_stream_limit_;
144 : Upstream::HostDescriptionConstSharedPtr real_host_description_;
145 : Stats::TimespanPtr conn_connect_ms_;
146 : Stats::TimespanPtr conn_length_;
147 : Event::TimerPtr connect_timer_;
148 : Event::TimerPtr connection_duration_timer_;
149 : bool resources_released_{false};
150 : bool timed_out_{false};
151 : // TODO(danzh) remove this once http codec exposes the handshake state for h3.
152 : bool has_handshake_completed_{false};
153 :
154 : protected:
155 : // HTTP/3 subclass should override this.
156 0 : virtual bool supportsEarlyData() const { return false; }
157 :
158 : private:
159 : State state_{State::Connecting};
160 : };
161 :
162 : // PendingStream is the base class tracking streams for which a connection has been created but not
163 : // yet established.
164 : class PendingStream : public LinkedObject<PendingStream>, public ConnectionPool::Cancellable {
165 : public:
166 : PendingStream(ConnPoolImplBase& parent, bool can_send_early_data);
167 : ~PendingStream() override;
168 :
169 : // ConnectionPool::Cancellable
170 : void cancel(Envoy::ConnectionPool::CancelPolicy policy) override;
171 :
172 : // The context here returns a pointer to whatever context is provided with newStream(),
173 : // which will be passed back to the parent in onPoolReady or onPoolFailure.
174 : virtual AttachContext& context() PURE;
175 :
176 : ConnPoolImplBase& parent_;
177 : // The request can be sent as early data.
178 : bool can_send_early_data_;
179 : };
180 :
181 : using PendingStreamPtr = std::unique_ptr<PendingStream>;
182 :
183 : using ActiveClientPtr = std::unique_ptr<ActiveClient>;
184 :
185 : // Base class that handles stream queueing logic shared between connection pool implementations.
186 : class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
187 : public:
188 : ConnPoolImplBase(Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
189 : Event::Dispatcher& dispatcher,
190 : const Network::ConnectionSocket::OptionsSharedPtr& options,
191 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
192 : Upstream::ClusterConnectivityState& state);
193 : virtual ~ConnPoolImplBase();
194 :
195 : void deleteIsPendingImpl();
196 : // By default, the connection pool will track connected and connecting stream
197 : // capacity as streams are created and destroyed. QUIC does custom stream
198 : // accounting so will override this to false.
199 404 : virtual bool trackStreamCapacity() { return true; }
200 :
201 : // A helper function to get the specific context type from the base class context.
202 554 : template <class T> T& typedContext(AttachContext& context) {
203 554 : ASSERT(dynamic_cast<T*>(&context) != nullptr);
204 554 : return *static_cast<T*>(&context);
205 554 : }
206 :
207 : // Determines if prefetching is warranted based on the number of streams in
208 : // use, pending streams, anticipated and/or currently unused capacity, and
209 : // preconnect configuration.
210 : //
211 : // If anticipate_incoming_stream is true this assumes a call to newStream is
212 : // pending, which is true for global preconnect.
213 : static bool shouldConnect(size_t pending_streams, size_t active_streams,
214 : int64_t connecting_and_connected_capacity, float preconnect_ratio,
215 : bool anticipate_incoming_stream = false);
216 :
217 : // Envoy::ConnectionPool::Instance implementation helpers
218 : void addIdleCallbackImpl(Instance::IdleCb cb);
219 : // Returns true if the pool is idle.
220 : bool isIdleImpl() const;
221 : void drainConnectionsImpl(DrainBehavior drain_behavior);
222 3219 : const Upstream::HostConstSharedPtr& host() const { return host_; }
223 : // Called if this pool is likely to be picked soon, to determine if it's worth preconnecting.
224 : bool maybePreconnectImpl(float global_preconnect_ratio);
225 :
226 : // Closes and destroys all connections. This must be called in the destructor of
227 : // derived classes because the derived ActiveClient will downcast parent_ to a more
228 : // specific type of ConnPoolImplBase, but if the more specific part is already destructed
229 : // (due to bottom-up destructor ordering in c++) that access will be invalid.
230 : void destructAllConnections();
231 :
232 : // Returns a new instance of ActiveClient.
233 : virtual ActiveClientPtr instantiateActiveClient() PURE;
234 :
235 : // Gets a pointer to the list that currently owns this client.
236 : std::list<ActiveClientPtr>& owningList(ActiveClient::State state);
237 :
238 : // Removes the PendingStream from the list of streams. Called when the PendingStream is
239 : // cancelled, e.g. when the stream is reset before a connection has been established.
240 : void onPendingStreamCancel(PendingStream& stream, Envoy::ConnectionPool::CancelPolicy policy);
241 :
242 : // Fails all pending streams, calling onPoolFailure on the associated callbacks.
243 : void purgePendingStreams(const Upstream::HostDescriptionConstSharedPtr& host_description,
244 : absl::string_view failure_reason,
245 : ConnectionPool::PoolFailureReason pool_failure_reason);
246 :
247 : // Closes any idle connections as this pool is drained.
248 : void closeIdleConnectionsForDrainingPool();
249 :
250 : // Changes the state_ of an ActiveClient and moves to the appropriate list.
251 : void transitionActiveClientState(ActiveClient& client, ActiveClient::State new_state);
252 :
253 : void onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
254 : Network::ConnectionEvent event);
255 :
256 : // Check if the pool has gone idle and invoke idle notification callbacks.
257 : void checkForIdleAndNotify();
258 :
259 : // See if the pool has gone idle. If we're draining, this will also close idle connections.
260 : void checkForIdleAndCloseIdleConnsIfDraining();
261 :
262 : void scheduleOnUpstreamReady();
263 : ConnectionPool::Cancellable* newStreamImpl(AttachContext& context, bool can_send_early_data);
264 :
265 : virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context,
266 : bool can_send_early_data) PURE;
267 :
268 : virtual void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
269 : AttachContext& context);
270 :
271 : virtual void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
272 : absl::string_view failure_reason,
273 : ConnectionPool::PoolFailureReason pool_failure_reason,
274 : AttachContext& context) PURE;
275 : virtual void onPoolReady(ActiveClient& client, AttachContext& context) PURE;
276 : // Called by derived classes any time a stream is completed or destroyed for any reason.
277 : void onStreamClosed(Envoy::ConnectionPool::ActiveClient& client, bool delay_attaching_stream);
278 :
279 933 : Event::Dispatcher& dispatcher() { return dispatcher_; }
280 692 : Upstream::ResourcePriority priority() const { return priority_; }
281 173 : const Network::ConnectionSocket::OptionsSharedPtr& socketOptions() { return socket_options_; }
282 346 : const Network::TransportSocketOptionsConstSharedPtr& transportSocketOptions() {
283 346 : return transport_socket_options_;
284 346 : }
285 0 : bool hasPendingStreams() const { return !pending_streams_.empty(); }
286 :
287 0 : void decrClusterStreamCapacity(uint32_t delta) {
288 0 : state_.decrConnectingAndConnectedStreamCapacity(delta);
289 0 : }
290 0 : void incrClusterStreamCapacity(uint32_t delta) {
291 0 : state_.incrConnectingAndConnectedStreamCapacity(delta);
292 0 : }
293 0 : void dumpState(std::ostream& os, int indent_level = 0) const {
294 0 : const char* spaces = spacesForLevel(indent_level);
295 0 : os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
296 0 : << DUMP_MEMBER(busy_clients_.size()) << DUMP_MEMBER(connecting_clients_.size())
297 0 : << DUMP_MEMBER(connecting_stream_capacity_) << DUMP_MEMBER(num_active_streams_)
298 0 : << DUMP_MEMBER(pending_streams_.size())
299 0 : << " per upstream preconnect ratio: " << perUpstreamPreconnectRatio();
300 0 : }
301 :
302 0 : friend std::ostream& operator<<(std::ostream& os, const ConnPoolImplBase& s) {
303 0 : s.dumpState(os);
304 0 : return os;
305 0 : }
306 0 : Upstream::ClusterConnectivityState& state() { return state_; }
307 :
308 : void decrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);
309 : void incrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);
310 :
311 : // Called when an upstream is ready to serve pending streams.
312 : void onUpstreamReady();
313 :
314 : // Called when an upstream is ready to serve early data streams.
315 : void onUpstreamReadyForEarlyData(ActiveClient& client);
316 :
317 : protected:
318 165 : virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {}
319 8 : virtual void onConnectFailed(Envoy::ConnectionPool::ActiveClient&) {}
320 :
321 : enum class ConnectionResult {
322 : FailedToCreateConnection,
323 : CreatedNewConnection,
324 : ShouldNotConnect,
325 : NoConnectionRateLimited,
326 : CreatedButRateLimited,
327 : };
328 : // Creates up to 3 connections, based on the preconnect ratio.
329 : // Returns the ConnectionResult of the last attempt.
330 : ConnectionResult tryCreateNewConnections();
331 :
332 : // Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or
333 : // to avoid starving this pool.
334 : // Demand is determined either by perUpstreamPreconnectRatio() or global_preconnect_ratio
335 : // if this is called by maybePreconnect()
336 : ConnectionResult tryCreateNewConnection(float global_preconnect_ratio = 0);
337 :
338 : // A helper function which determines if a canceled pending connection should
339 : // be closed as excess or not.
340 : bool connectingConnectionIsExcess(const ActiveClient& client) const;
341 :
342 : // A helper function which determines if a new incoming stream should trigger
343 : // connection preconnect.
344 : bool shouldCreateNewConnection(float global_preconnect_ratio) const;
345 :
346 : float perUpstreamPreconnectRatio() const;
347 :
348 : ConnectionPool::Cancellable*
349 173 : addPendingStream(Envoy::ConnectionPool::PendingStreamPtr&& pending_stream) {
350 173 : LinkedList::moveIntoList(std::move(pending_stream), pending_streams_);
351 173 : state_.incrPendingStreams(1);
352 173 : return pending_streams_.front().get();
353 173 : }
354 :
355 0 : bool hasActiveStreams() const { return num_active_streams_ > 0; }
356 :
357 : Upstream::ClusterConnectivityState& state_;
358 :
359 : const Upstream::HostConstSharedPtr host_;
360 : const Upstream::ResourcePriority priority_;
361 :
362 : Event::Dispatcher& dispatcher_;
363 : const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
364 : const Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
365 :
366 : // True if the max requests circuit breakers apply.
367 : // This will be false for the TCP pool, true otherwise.
368 202 : virtual bool enforceMaxRequests() const { return true; }
369 :
370 : std::list<Instance::IdleCb> idle_callbacks_;
371 :
372 : // When calling purgePendingStreams, this list will be used to hold the streams we are about
373 : // to purge. We need this if one cancelled streams cancels a different pending stream
374 : std::list<PendingStreamPtr> pending_streams_to_purge_;
375 :
376 : // Clients that are ready to handle additional streams.
377 : // All entries are in state Ready.
378 : std::list<ActiveClientPtr> ready_clients_;
379 :
380 : // Clients that are not ready to handle additional streams due to being Busy or Draining.
381 : std::list<ActiveClientPtr> busy_clients_;
382 :
383 : // Clients that are not ready to handle additional streams because they are Connecting.
384 : std::list<ActiveClientPtr> connecting_clients_;
385 :
386 : // Clients that are ready to handle additional early data streams because they have 0-RTT
387 : // credentials.
388 : std::list<ActiveClientPtr> early_data_clients_;
389 :
390 : // The number of streams that can be immediately dispatched
391 : // if all Connecting connections become connected.
392 : uint32_t connecting_stream_capacity_{0};
393 :
394 : private:
395 : // Drain all the clients in the given list.
396 : // Prerequisite: the given clients shouldn't be idle.
397 : void drainClients(std::list<ActiveClientPtr>& clients);
398 :
399 : std::list<PendingStreamPtr> pending_streams_;
400 :
401 : // The number of streams currently attached to clients.
402 : uint32_t num_active_streams_{0};
403 :
404 : // Whether the connection pool is currently in the process of closing
405 : // all connections so that it can be gracefully deleted.
406 : bool is_draining_for_deletion_{false};
407 :
408 : // True iff this object is in the deferred delete list.
409 : bool deferred_deleting_{false};
410 :
411 : Event::SchedulableCallbackPtr upstream_ready_cb_;
412 : Common::DebugRecursionChecker recursion_checker_;
413 : };
414 :
415 : } // namespace ConnectionPool
416 : } // namespace Envoy
417 :
418 : // NOLINT(namespace-envoy)
419 : namespace fmt {
420 : template <> struct formatter<Envoy::ConnectionPool::ConnPoolImplBase> : ostream_formatter {};
421 : } // namespace fmt
|