Line data Source code
1 : #include "source/common/conn_pool/conn_pool_base.h"
2 :
3 : #include "source/common/common/assert.h"
4 : #include "source/common/common/debug_recursion_checker.h"
5 : #include "source/common/network/transport_socket_options_impl.h"
6 : #include "source/common/runtime/runtime_features.h"
7 : #include "source/common/stats/timespan_impl.h"
8 : #include "source/common/upstream/upstream_impl.h"
9 :
10 : namespace Envoy {
11 : namespace ConnectionPool {
12 : namespace {
13 0 : [[maybe_unused]] ssize_t connectingCapacity(const std::list<ActiveClientPtr>& connecting_clients) {
14 0 : ssize_t ret = 0;
15 0 : for (const auto& client : connecting_clients) {
16 0 : ret += client->currentUnusedCapacity();
17 0 : }
18 0 : return ret;
19 0 : }
20 : } // namespace
21 :
22 : ConnPoolImplBase::ConnPoolImplBase(
23 : Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
24 : Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
25 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
26 : Upstream::ClusterConnectivityState& state)
27 : : state_(state), host_(host), priority_(priority), dispatcher_(dispatcher),
28 : socket_options_(options), transport_socket_options_(transport_socket_options),
29 173 : upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}
30 :
31 173 : ConnPoolImplBase::~ConnPoolImplBase() {
32 173 : ASSERT(isIdleImpl());
33 173 : ASSERT(connecting_stream_capacity_ == 0);
34 173 : }
35 :
36 173 : void ConnPoolImplBase::deleteIsPendingImpl() {
37 173 : deferred_deleting_ = true;
38 173 : ASSERT(isIdleImpl());
39 173 : ASSERT(connecting_stream_capacity_ == 0);
40 173 : }
41 :
42 173 : void ConnPoolImplBase::destructAllConnections() {
43 692 : for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_, &early_data_clients_}) {
44 692 : while (!list->empty()) {
45 0 : list->front()->close();
46 0 : }
47 692 : }
48 :
49 : // Make sure all clients are destroyed before we are destroyed.
50 173 : dispatcher_.clearDeferredDeleteList();
51 173 : }
52 :
53 : bool ConnPoolImplBase::shouldConnect(size_t pending_streams, size_t active_streams,
54 : int64_t connecting_and_connected_capacity,
55 432 : float preconnect_ratio, bool anticipate_incoming_stream) {
56 : // This is set to true any time global preconnect is being calculated.
57 : // ClusterManagerImpl::maybePreconnect is called directly before a stream is created, so the
58 : // stream must be anticipated.
59 : //
60 : // Also without this, we would never pre-establish a connection as the first
61 : // connection in a pool because pending/active streams could both be 0.
62 432 : int anticipated_streams = anticipate_incoming_stream ? 1 : 0;
63 :
64 : // The number of streams we want to be provisioned for is the number of
65 : // pending, active, and anticipated streams times the preconnect ratio.
66 : // The number of streams we are (theoretically) provisioned for is the
67 : // connecting stream capacity plus the number of active streams.
68 : //
69 : // If preconnect ratio is not set, it defaults to 1, and this simplifies to the
70 : // legacy value of pending_streams_.size() > connecting_stream_capacity_
71 432 : return (pending_streams + active_streams + anticipated_streams) * preconnect_ratio >
72 432 : connecting_and_connected_capacity + active_streams;
73 432 : }
74 :
75 432 : bool ConnPoolImplBase::shouldCreateNewConnection(float global_preconnect_ratio) const {
76 : // If the host is not healthy, don't make it do extra work, especially as
77 : // upstream selection logic may result in bypassing this upstream entirely.
78 : // If an Envoy user wants preconnecting for degraded upstreams this could be
79 : // added later via extending the preconnect config.
80 432 : if (host_->coarseHealth() != Upstream::Host::Health::Healthy) {
81 0 : return pending_streams_.size() > connecting_stream_capacity_;
82 0 : }
83 :
84 : // Determine if we are trying to prefetch for global preconnect or local preconnect.
85 432 : if (global_preconnect_ratio != 0) {
86 : // If global preconnecting is on, and this connection is within the global
87 : // preconnect limit, preconnect.
88 : // For global preconnect, we anticipate an incoming stream to this pool, since it is
89 : // prefetching for the next upcoming stream, which will likely be assigned to this pool.
90 : // We may eventually want to track preconnect_attempts to allow more preconnecting for
91 : // heavily weighted upstreams or sticky picks.
92 0 : return shouldConnect(pending_streams_.size(), num_active_streams_, connecting_stream_capacity_,
93 0 : global_preconnect_ratio, true);
94 432 : } else {
95 : // Ensure this local pool has adequate connections for the given load.
96 : //
97 : // Local preconnect does not need to anticipate a stream. It is called as
98 : // new streams are established or torn down and simply attempts to maintain
99 : // the correct ratio of streams and anticipated capacity.
100 432 : return shouldConnect(pending_streams_.size(), num_active_streams_, connecting_stream_capacity_,
101 432 : perUpstreamPreconnectRatio());
102 432 : }
103 432 : }
104 :
105 432 : float ConnPoolImplBase::perUpstreamPreconnectRatio() const {
106 432 : return host_->cluster().perUpstreamPreconnectRatio();
107 432 : }
108 :
109 259 : ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
110 259 : ConnPoolImplBase::ConnectionResult result;
111 : // Somewhat arbitrarily cap the number of connections preconnected due to new
112 : // incoming connections. The preconnect ratio is capped at 3, so in steady
113 : // state, no more than 3 connections should be preconnected. If hosts go
114 : // unhealthy, and connections are not immediately preconnected, it could be that
115 : // many connections are desired when the host becomes healthy again, but
116 : // overwhelming it with connections is not desirable.
117 432 : for (int i = 0; i < 3; ++i) {
118 432 : result = tryCreateNewConnection();
119 432 : if (result != ConnectionResult::CreatedNewConnection) {
120 259 : break;
121 259 : }
122 432 : }
123 259 : ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection);
124 259 : return result;
125 259 : }
126 :
127 : ConnPoolImplBase::ConnectionResult
128 432 : ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
129 : // There are already enough Connecting connections for the number of queued streams.
130 432 : if (!shouldCreateNewConnection(global_preconnect_ratio)) {
131 259 : ENVOY_LOG(trace, "not creating a new connection, shouldCreateNewConnection returned false.");
132 259 : return ConnectionResult::ShouldNotConnect;
133 259 : }
134 :
135 173 : const bool can_create_connection = host_->canCreateConnection(priority_);
136 :
137 173 : if (!can_create_connection) {
138 0 : host_->cluster().trafficStats()->upstream_cx_overflow_.inc();
139 0 : }
140 : // If we are at the connection circuit-breaker limit due to other upstreams having
141 : // too many open connections, and this upstream has no connections, always create one, to
142 : // prevent pending streams being queued to this upstream with no way to be processed.
143 173 : if (can_create_connection || (ready_clients_.empty() && busy_clients_.empty() &&
144 173 : connecting_clients_.empty() && early_data_clients_.empty())) {
145 173 : ENVOY_LOG(debug, "creating a new connection (connecting={})", connecting_clients_.size());
146 173 : ActiveClientPtr client = instantiateActiveClient();
147 173 : if (client.get() == nullptr) {
148 0 : ENVOY_LOG(trace, "connection creation failed");
149 0 : return ConnectionResult::FailedToCreateConnection;
150 0 : }
151 173 : ASSERT(client->state() == ActiveClient::State::Connecting);
152 173 : ASSERT(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
153 173 : static_cast<uint64_t>(client->currentUnusedCapacity()));
154 173 : ASSERT(client->real_host_description_);
155 : // Increase the connecting capacity to reflect the streams this connection can serve.
156 173 : incrConnectingAndConnectedStreamCapacity(client->currentUnusedCapacity(), *client);
157 173 : LinkedList::moveIntoList(std::move(client), owningList(client->state()));
158 173 : return can_create_connection ? ConnectionResult::CreatedNewConnection
159 173 : : ConnectionResult::CreatedButRateLimited;
160 173 : } else {
161 0 : ENVOY_LOG(trace, "not creating a new connection: connection constrained");
162 0 : return ConnectionResult::NoConnectionRateLimited;
163 0 : }
164 173 : }
165 :
166 : void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
167 202 : AttachContext& context) {
168 202 : ASSERT(client.readyForStream());
169 :
170 202 : Upstream::ClusterTrafficStats& traffic_stats = *host_->cluster().trafficStats();
171 202 : if (client.state() == Envoy::ConnectionPool::ActiveClient::State::ReadyForEarlyData) {
172 0 : traffic_stats.upstream_rq_0rtt_.inc();
173 0 : }
174 :
175 202 : if (enforceMaxRequests() && !host_->cluster().resourceManager(priority_).requests().canCreate()) {
176 0 : ENVOY_LOG(debug, "max streams overflow");
177 0 : onPoolFailure(client.real_host_description_, absl::string_view(),
178 0 : ConnectionPool::PoolFailureReason::Overflow, context);
179 0 : traffic_stats.upstream_rq_pending_overflow_.inc();
180 0 : return;
181 0 : }
182 202 : ENVOY_CONN_LOG(debug, "creating stream", client);
183 :
184 : // Latch capacity before updating remaining streams.
185 202 : uint64_t capacity = client.currentUnusedCapacity();
186 202 : client.remaining_streams_--;
187 202 : if (client.remaining_streams_ == 0) {
188 0 : ENVOY_CONN_LOG(debug, "maximum streams per connection, start draining", client);
189 0 : traffic_stats.upstream_cx_max_requests_.inc();
190 0 : transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Draining);
191 202 : } else if (capacity == 1) {
192 : // As soon as the new stream is created, the client will be maxed out.
193 54 : transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Busy);
194 54 : }
195 :
196 : // Decrement the capacity, as there's one less stream available for serving.
197 : // For HTTP/3, the capacity is updated in newStreamEncoder.
198 202 : if (trackStreamCapacity()) {
199 202 : decrConnectingAndConnectedStreamCapacity(1, client);
200 202 : }
201 : // Track the new active stream.
202 202 : state_.incrActiveStreams(1);
203 202 : num_active_streams_++;
204 202 : host_->stats().rq_total_.inc();
205 202 : host_->stats().rq_active_.inc();
206 202 : traffic_stats.upstream_rq_total_.inc();
207 202 : traffic_stats.upstream_rq_active_.inc();
208 202 : host_->cluster().resourceManager(priority_).requests().inc();
209 :
210 202 : onPoolReady(client, context);
211 202 : }
212 :
213 : void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& client,
214 202 : bool delay_attaching_stream) {
215 202 : ENVOY_CONN_LOG(debug, "destroying stream: {} remaining", client, client.numActiveStreams());
216 202 : ASSERT(num_active_streams_ > 0);
217 202 : state_.decrActiveStreams(1);
218 202 : num_active_streams_--;
219 202 : host_->stats().rq_active_.dec();
220 202 : host_->cluster().trafficStats()->upstream_rq_active_.dec();
221 202 : host_->cluster().resourceManager(priority_).requests().dec();
222 : // We don't update the capacity for HTTP/3 as the stream count should only
223 : // increase when a MAX_STREAMS frame is received.
224 202 : if (trackStreamCapacity()) {
225 : // If the effective client capacity was limited by concurrency, increase connecting capacity.
226 202 : bool limited_by_concurrency =
227 202 : client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1;
228 : // The capacity calculated by concurrency could be negative if a SETTINGS frame lowered the
229 : // number of allowed streams. In this case, effective client capacity was still limited by
230 : // concurrency, compare client.concurrent_stream_limit_ and client.numActiveStreams() directly
231 : // to avoid overflow.
232 202 : bool negative_capacity = client.concurrent_stream_limit_ < client.numActiveStreams() + 1;
233 202 : if (negative_capacity || limited_by_concurrency) {
234 33 : incrConnectingAndConnectedStreamCapacity(1, client);
235 33 : }
236 202 : }
237 202 : if (client.state() == ActiveClient::State::Draining && client.numActiveStreams() == 0) {
238 : // Close out the draining client if we no longer have active streams.
239 0 : client.close();
240 202 : } else if (client.state() == ActiveClient::State::Busy && client.currentUnusedCapacity() > 0) {
241 33 : if (!client.hasHandshakeCompleted()) {
242 0 : transitionActiveClientState(client, ActiveClient::State::ReadyForEarlyData);
243 0 : if (!delay_attaching_stream) {
244 0 : onUpstreamReadyForEarlyData(client);
245 0 : }
246 33 : } else {
247 33 : transitionActiveClientState(client, ActiveClient::State::Ready);
248 33 : if (!delay_attaching_stream) {
249 0 : onUpstreamReady();
250 0 : }
251 33 : }
252 33 : }
253 202 : }
254 :
255 : ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
256 251 : bool can_send_early_data) {
257 251 : ASSERT(!is_draining_for_deletion_);
258 251 : ASSERT(!deferred_deleting_);
259 :
260 251 : ASSERT(static_cast<ssize_t>(connecting_stream_capacity_) ==
261 251 : connectingCapacity(connecting_clients_) +
262 251 : connectingCapacity(early_data_clients_)); // O(n) debug check.
263 251 : if (!ready_clients_.empty()) {
264 78 : ActiveClient& client = *ready_clients_.front();
265 78 : ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
266 78 : attachStreamToClient(client, context);
267 : // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
268 78 : tryCreateNewConnections();
269 78 : return nullptr;
270 78 : }
271 :
272 173 : if (can_send_early_data && !early_data_clients_.empty()) {
273 0 : ActiveClient& client = *early_data_clients_.front();
274 0 : ENVOY_CONN_LOG(debug, "using existing early data ready connection", client);
275 0 : attachStreamToClient(client, context);
276 : // Even if there's an available client, we may want to preconnect to handle the next
277 : // incoming stream.
278 0 : tryCreateNewConnections();
279 0 : return nullptr;
280 0 : }
281 :
282 173 : if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
283 0 : ENVOY_LOG(debug, "max pending streams overflow");
284 0 : onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
285 0 : context);
286 0 : host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
287 0 : return nullptr;
288 0 : }
289 :
290 173 : ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
291 173 : ENVOY_LOG(debug, "trying to create new connection");
292 173 : ENVOY_LOG(trace, fmt::format("{}", *this));
293 :
294 173 : auto old_capacity = connecting_stream_capacity_;
295 : // This must come after newPendingStream() because this function uses the
296 : // length of pending_streams_ to determine if a new connection is needed.
297 173 : const ConnectionResult result = tryCreateNewConnections();
298 : // If there is not enough connecting capacity, the only reason to not
299 : // increase capacity is if the connection limits are exceeded.
300 173 : ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
301 173 : connecting_stream_capacity_ > old_capacity ||
302 173 : (result == ConnectionResult::NoConnectionRateLimited ||
303 173 : result == ConnectionResult::FailedToCreateConnection),
304 173 : fmt::format("Failed to create expected connection: {}", *this));
305 173 : if (result == ConnectionResult::FailedToCreateConnection) {
306 : // This currently only happens for HTTP/3 if secrets aren't yet loaded.
307 : // Trigger connection failure.
308 0 : pending->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
309 0 : onPoolFailure(nullptr, absl::string_view(),
310 0 : ConnectionPool::PoolFailureReason::LocalConnectionFailure, context);
311 0 : return nullptr;
312 0 : }
313 173 : return pending;
314 173 : }
315 :
316 0 : bool ConnPoolImplBase::maybePreconnectImpl(float global_preconnect_ratio) {
317 0 : ASSERT(!deferred_deleting_);
318 0 : return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
319 0 : }
320 :
321 33 : void ConnPoolImplBase::scheduleOnUpstreamReady() {
322 33 : upstream_ready_cb_->scheduleCallbackCurrentIteration();
323 33 : }
324 :
325 177 : void ConnPoolImplBase::onUpstreamReady() {
326 301 : while (!pending_streams_.empty() && !ready_clients_.empty()) {
327 124 : ActiveClientPtr& client = ready_clients_.front();
328 124 : ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
329 : // Pending streams are pushed onto the front, so pull from the back.
330 124 : attachStreamToClient(*client, pending_streams_.back()->context());
331 124 : state_.decrPendingStreams(1);
332 124 : pending_streams_.pop_back();
333 124 : }
334 177 : if (!pending_streams_.empty()) {
335 0 : tryCreateNewConnections();
336 0 : }
337 177 : }
338 :
339 850 : std::list<ActiveClientPtr>& ConnPoolImplBase::owningList(ActiveClient::State state) {
340 850 : switch (state) {
341 346 : case ActiveClient::State::Connecting:
342 346 : return connecting_clients_;
343 0 : case ActiveClient::State::ReadyForEarlyData:
344 0 : return early_data_clients_;
345 396 : case ActiveClient::State::Ready:
346 396 : return ready_clients_;
347 108 : case ActiveClient::State::Busy:
348 108 : case ActiveClient::State::Draining:
349 108 : return busy_clients_;
350 0 : case ActiveClient::State::Closed:
351 0 : break; // Fall through to PANIC.
352 850 : }
353 0 : PANIC("unexpected");
354 0 : }
355 :
356 : void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,
357 252 : ActiveClient::State new_state) {
358 252 : auto& old_list = owningList(client.state());
359 252 : auto& new_list = owningList(new_state);
360 252 : client.setState(new_state);
361 :
362 : // old_list and new_list can be equal when transitioning from Busy to Draining.
363 : //
364 : // The documentation for list.splice() (which is what moveBetweenLists() calls) is
365 : // unclear whether it is allowed for src and dst to be the same, so check here
366 : // since it is a no-op anyways.
367 252 : if (&old_list != &new_list) {
368 252 : client.moveBetweenLists(old_list, new_list);
369 252 : }
370 252 : }
371 :
372 173 : void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }
373 :
374 0 : void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
375 0 : Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
376 :
377 : // Create a separate list of elements to close to avoid mutate-while-iterating problems.
378 0 : std::list<ActiveClient*> to_close;
379 :
380 0 : for (auto& client : ready_clients_) {
381 0 : if (client->numActiveStreams() == 0) {
382 0 : to_close.push_back(client.get());
383 0 : }
384 0 : }
385 :
386 0 : if (pending_streams_.empty()) {
387 0 : for (auto& client : connecting_clients_) {
388 0 : to_close.push_back(client.get());
389 0 : }
390 0 : for (auto& client : early_data_clients_) {
391 0 : if (client->numActiveStreams() == 0) {
392 0 : to_close.push_back(client.get());
393 0 : }
394 0 : }
395 0 : }
396 :
397 0 : for (auto& entry : to_close) {
398 0 : ENVOY_LOG_EVENT(debug, "closing_idle_client", "closing idle client {} for cluster {}",
399 0 : entry->id(), host_->cluster().name());
400 0 : entry->close();
401 0 : }
402 0 : }
403 :
404 0 : void ConnPoolImplBase::drainClients(std::list<ActiveClientPtr>& clients) {
405 0 : while (!clients.empty()) {
406 0 : ASSERT(clients.front()->numActiveStreams() > 0u);
407 0 : ENVOY_LOG_EVENT(
408 0 : debug, "draining_non_idle_client", "draining {} client {} for cluster {}",
409 0 : (clients.front()->state() == ActiveClient::State::Ready ? "ready" : "early data"),
410 0 : clients.front()->id(), host_->cluster().name());
411 0 : transitionActiveClientState(*clients.front(), ActiveClient::State::Draining);
412 0 : }
413 0 : }
414 :
415 0 : void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) {
416 0 : if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
417 0 : is_draining_for_deletion_ = true;
418 0 : checkForIdleAndCloseIdleConnsIfDraining();
419 0 : return;
420 0 : }
421 0 : closeIdleConnectionsForDrainingPool();
422 : // closeIdleConnectionsForDrainingPool() closes all connections in ready_clients_ with no active
423 : // streams and if no pending streams, all connections in early_data_clients_ with no active
424 : // streams as well, so all remaining entries in ready_clients_ are serving streams. Move them and
425 : // all entries in busy_clients_ to draining.
426 0 : if (pending_streams_.empty()) {
427 : // The remaining early data clients are non-idle.
428 0 : drainClients(early_data_clients_);
429 0 : }
430 :
431 0 : drainClients(ready_clients_);
432 :
433 : // Changing busy_clients_ to Draining does not move them between lists,
434 : // so use a for-loop since the list is not mutated.
435 0 : ASSERT(&owningList(ActiveClient::State::Draining) == &busy_clients_);
436 0 : for (auto& busy_client : busy_clients_) {
437 0 : if (busy_client->state() == ActiveClient::State::Draining) {
438 0 : continue;
439 0 : }
440 0 : ENVOY_LOG_EVENT(debug, "draining_busy_client", "draining busy client {} for cluster {}",
441 0 : busy_client->id(), host_->cluster().name());
442 0 : transitionActiveClientState(*busy_client, ActiveClient::State::Draining);
443 0 : }
444 0 : }
445 :
446 522 : bool ConnPoolImplBase::isIdleImpl() const {
447 522 : return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
448 522 : connecting_clients_.empty() && early_data_clients_.empty();
449 522 : }
450 :
451 : /*
452 : This method may be invoked once or twice.
453 : It is called first time in ConnPoolImplBase::onConnectionEvent for Local/RemoteClose events.
454 : The second time it is called from Envoy::Tcp::ActiveTcpClient::~ActiveTcpClient via
455 : ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining.
456 :
457 : The logic must be constructed in such way that the method is called once or twice.
458 : See PR 30807 description for explanation why idle callbacks are deleted after being called.
459 : */
460 522 : void ConnPoolImplBase::checkForIdleAndNotify() {
461 522 : if (isIdleImpl()) {
462 173 : ENVOY_LOG(debug, "invoking {} idle callback(s) - is_draining_for_deletion_={}",
463 173 : idle_callbacks_.size(), is_draining_for_deletion_);
464 173 : for (const Instance::IdleCb& cb : idle_callbacks_) {
465 173 : cb();
466 173 : }
467 : // Clear callbacks, so they are not executed if checkForIdleAndNotify is called again.
468 173 : idle_callbacks_.clear();
469 173 : }
470 522 : }
471 :
472 349 : void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() {
473 349 : if (is_draining_for_deletion_) {
474 0 : closeIdleConnectionsForDrainingPool();
475 0 : }
476 :
477 349 : checkForIdleAndNotify();
478 349 : }
479 :
480 : void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
481 338 : Network::ConnectionEvent event) {
482 338 : switch (event) {
483 101 : case Network::ConnectionEvent::RemoteClose:
484 173 : case Network::ConnectionEvent::LocalClose: {
485 173 : if (client.connect_timer_) {
486 8 : ASSERT(!client.has_handshake_completed_);
487 8 : client.connect_timer_->disableTimer();
488 8 : client.connect_timer_.reset();
489 8 : }
490 173 : decrConnectingAndConnectedStreamCapacity(client.currentUnusedCapacity(), client);
491 :
492 : // Make sure that onStreamClosed won't double count.
493 173 : client.remaining_streams_ = 0;
494 : // The client died.
495 173 : ENVOY_CONN_LOG(debug, "client disconnected, failure reason: {}", client, failure_reason);
496 :
497 173 : Envoy::Upstream::reportUpstreamCxDestroy(host_, event);
498 173 : const bool incomplete_stream = client.closingWithIncompleteStream();
499 173 : if (incomplete_stream) {
500 61 : Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
501 61 : }
502 :
503 173 : if (!client.hasHandshakeCompleted()) {
504 8 : client.has_handshake_completed_ = true;
505 8 : host_->cluster().trafficStats()->upstream_cx_connect_fail_.inc();
506 8 : host_->stats().cx_connect_fail_.inc();
507 :
508 8 : onConnectFailed(client);
509 : // Purge pending streams only if this client doesn't contribute to the local connecting
510 : // stream capacity. In other words, the rest clients would be able to handle all the
511 : // pending stream once they are connected.
512 8 : ConnectionPool::PoolFailureReason reason;
513 8 : if (client.timed_out_) {
514 0 : reason = ConnectionPool::PoolFailureReason::Timeout;
515 8 : } else if (event == Network::ConnectionEvent::RemoteClose) {
516 8 : reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure;
517 8 : } else {
518 0 : reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure;
519 0 : }
520 :
521 : // Raw connect failures should never happen under normal circumstances. If we have an
522 : // upstream that is behaving badly, streams can get stuck here in the pending state. If we
523 : // see a connect failure, we purge all pending streams so that calling code can determine
524 : // what to do with the stream.
525 : // NOTE: We move the existing pending streams to a temporary list. This is done so that
526 : // if retry logic submits a new stream to the pool, we don't fail it inline.
527 8 : purgePendingStreams(client.real_host_description_, failure_reason, reason);
528 : // See if we should preconnect based on active connections.
529 8 : if (!is_draining_for_deletion_) {
530 8 : tryCreateNewConnections();
531 8 : }
532 8 : }
533 :
534 : // We need to release our resourceManager() resources before checking below for
535 : // whether we can create a new connection. Normally this would happen when
536 : // client's destructor runs, but this object needs to be deferredDelete'd(), so
537 : // this forces part of its cleanup to happen now.
538 173 : client.releaseResources();
539 :
540 : // Again, since we know this object is going to be deferredDelete'd(), we take
541 : // this opportunity to disable and reset the connection duration timer so that
542 : // it doesn't trigger while on the deferred delete list. In theory it is safe
543 : // to handle the Closed state in onConnectionDurationTimeout, but we handle
544 : // it here for simplicity and safety anyway.
545 173 : if (client.connection_duration_timer_) {
546 0 : client.connection_duration_timer_->disableTimer();
547 0 : client.connection_duration_timer_.reset();
548 0 : }
549 :
550 173 : dispatcher_.deferredDelete(client.removeFromList(owningList(client.state())));
551 :
552 : // Check if the pool transitioned to idle state after removing closed client
553 : // from one of the client tracking lists.
554 : // There is no need to check if other connections are idle in a draining pool
555 : // because the pool will close all idle connection when it is starting to
556 : // drain.
557 : // Trying to close other connections here can lead to deep recursion when
558 : // a large number idle connections are closed at the start of pool drain.
559 : // See CdsIntegrationTest.CdsClusterDownWithLotsOfIdleConnections for an example.
560 173 : checkForIdleAndNotify();
561 :
562 173 : client.setState(ActiveClient::State::Closed);
563 :
564 : // If we have pending streams and we just lost a connection we should make a new one.
565 173 : if (!pending_streams_.empty()) {
566 0 : tryCreateNewConnections();
567 0 : }
568 173 : break;
569 101 : }
570 165 : case Network::ConnectionEvent::Connected: {
571 165 : ASSERT(client.connect_timer_ != nullptr && !client.has_handshake_completed_);
572 165 : client.connect_timer_->disableTimer();
573 165 : client.connect_timer_.reset();
574 :
575 165 : ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity());
576 165 : connecting_stream_capacity_ -= client.currentUnusedCapacity();
577 165 : client.has_handshake_completed_ = true;
578 165 : client.conn_connect_ms_->complete();
579 165 : client.conn_connect_ms_.reset();
580 165 : if (client.state() == ActiveClient::State::Connecting ||
581 165 : client.state() == ActiveClient::State::ReadyForEarlyData) {
582 165 : transitionActiveClientState(client,
583 165 : (client.currentUnusedCapacity() > 0 ? ActiveClient::State::Ready
584 165 : : ActiveClient::State::Busy));
585 165 : }
586 :
587 : // Now that the active client is ready, set up a timer for max connection duration.
588 165 : const absl::optional<std::chrono::milliseconds> max_connection_duration =
589 165 : client.parent_.host()->cluster().maxConnectionDuration();
590 165 : if (max_connection_duration.has_value()) {
591 0 : client.connection_duration_timer_ = client.parent_.dispatcher().createTimer(
592 0 : [&client]() { client.onConnectionDurationTimeout(); });
593 0 : client.connection_duration_timer_->enableTimer(max_connection_duration.value());
594 0 : }
595 : // Initialize client read filters
596 165 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.initialize_upstream_filters")) {
597 165 : client.initializeReadFilters();
598 165 : }
599 : // At this point, for the mixed ALPN pool, the client may be deleted. Do not
600 : // refer to client after this point.
601 165 : onConnected(client);
602 165 : if (client.readyForStream()) {
603 165 : onUpstreamReady();
604 165 : }
605 165 : checkForIdleAndCloseIdleConnsIfDraining();
606 165 : break;
607 101 : }
608 0 : case Network::ConnectionEvent::ConnectedZeroRtt: {
609 0 : ENVOY_CONN_LOG(debug, "0-RTT connected with capacity {}", client,
610 0 : client.currentUnusedCapacity());
611 : // No need to update connecting capacity and connect_timer_ as the client is still connecting.
612 0 : ASSERT(client.state() == ActiveClient::State::Connecting);
613 0 : host()->cluster().trafficStats()->upstream_cx_connect_with_0_rtt_.inc();
614 0 : transitionActiveClientState(client, (client.currentUnusedCapacity() > 0
615 0 : ? ActiveClient::State::ReadyForEarlyData
616 0 : : ActiveClient::State::Busy));
617 0 : break;
618 101 : }
619 338 : }
620 338 : }
621 :
622 : PendingStream::PendingStream(ConnPoolImplBase& parent, bool can_send_early_data)
623 173 : : parent_(parent), can_send_early_data_(can_send_early_data) {
624 173 : Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
625 173 : traffic_stats.upstream_rq_pending_total_.inc();
626 173 : traffic_stats.upstream_rq_pending_active_.inc();
627 173 : parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().inc();
628 173 : }
629 :
630 173 : PendingStream::~PendingStream() {
631 173 : parent_.host()->cluster().trafficStats()->upstream_rq_pending_active_.dec();
632 173 : parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().dec();
633 173 : }
634 :
635 43 : void PendingStream::cancel(Envoy::ConnectionPool::CancelPolicy policy) {
636 43 : parent_.onPendingStreamCancel(*this, policy);
637 43 : }
638 :
639 : void ConnPoolImplBase::purgePendingStreams(
640 : const Upstream::HostDescriptionConstSharedPtr& host_description,
641 8 : absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason) {
642 : // NOTE: We move the existing pending streams to a temporary list. This is done so that
643 : // if retry logic submits a new stream to the pool, we don't fail it inline.
644 8 : state_.decrPendingStreams(pending_streams_.size());
645 8 : pending_streams_to_purge_ = std::move(pending_streams_);
646 14 : while (!pending_streams_to_purge_.empty()) {
647 6 : PendingStreamPtr stream =
648 6 : pending_streams_to_purge_.front()->removeFromList(pending_streams_to_purge_);
649 6 : host_->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
650 6 : onPoolFailure(host_description, failure_reason, reason, stream->context());
651 6 : }
652 8 : }
653 :
654 0 : bool ConnPoolImplBase::connectingConnectionIsExcess(const ActiveClient& client) const {
655 0 : ASSERT(!client.hasHandshakeCompleted());
656 0 : ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity());
657 : // If perUpstreamPreconnectRatio is one, this simplifies to checking if there would still be
658 : // sufficient connecting stream capacity to serve all pending streams if the most recent client
659 : // were removed from the picture.
660 : //
661 : // If preconnect ratio is set, it also factors in the anticipated load based on both queued
662 : // streams and active streams, and makes sure the connecting capacity would still be sufficient to
663 : // serve that even with the most recent client removed.
664 0 : return (pending_streams_.size() + num_active_streams_) * perUpstreamPreconnectRatio() <=
665 0 : (connecting_stream_capacity_ - client.currentUnusedCapacity() + num_active_streams_);
666 0 : }
667 :
668 : void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
669 43 : Envoy::ConnectionPool::CancelPolicy policy) {
670 43 : ENVOY_LOG(debug, "cancelling pending stream");
671 43 : if (!pending_streams_to_purge_.empty()) {
672 : // If pending_streams_to_purge_ is not empty, it means that we are called from
673 : // with-in a onPoolFailure callback invoked in purgePendingStreams (i.e. purgePendingStreams
674 : // is down in the call stack). Remove this stream from the list as it is cancelled,
675 : // and there is no need to call its onPoolFailure callback.
676 0 : stream.removeFromList(pending_streams_to_purge_);
677 43 : } else {
678 43 : state_.decrPendingStreams(1);
679 43 : stream.removeFromList(pending_streams_);
680 43 : }
681 43 : if (policy == Envoy::ConnectionPool::CancelPolicy::CloseExcess) {
682 0 : if (!connecting_clients_.empty() &&
683 0 : connectingConnectionIsExcess(*connecting_clients_.front())) {
684 0 : auto& client = *connecting_clients_.front();
685 0 : transitionActiveClientState(client, ActiveClient::State::Draining);
686 0 : client.close();
687 0 : } else if (!early_data_clients_.empty()) {
688 0 : for (ActiveClientPtr& client : early_data_clients_) {
689 0 : if (client->numActiveStreams() == 0) {
690 : // Find an idle early data client and check if it is excess.
691 0 : if (connectingConnectionIsExcess(*client)) {
692 : // Close the client after the for loop avoid messing up with iterator.
693 0 : transitionActiveClientState(*client, ActiveClient::State::Draining);
694 0 : client->close();
695 0 : }
696 0 : break;
697 0 : }
698 0 : }
699 0 : }
700 0 : }
701 :
702 43 : host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
703 43 : checkForIdleAndCloseIdleConnsIfDraining();
704 43 : }
705 :
706 : void ConnPoolImplBase::decrConnectingAndConnectedStreamCapacity(uint32_t delta,
707 375 : ActiveClient& client) {
708 375 : state_.decrConnectingAndConnectedStreamCapacity(delta);
709 375 : if (!client.hasHandshakeCompleted()) {
710 : // If still doing handshake, it is contributing to the local connecting stream capacity. Update
711 : // the capacity as well.
712 8 : ASSERT(connecting_stream_capacity_ >= delta);
713 8 : connecting_stream_capacity_ -= delta;
714 8 : }
715 375 : }
716 :
717 : void ConnPoolImplBase::incrConnectingAndConnectedStreamCapacity(uint32_t delta,
718 206 : ActiveClient& client) {
719 206 : state_.incrConnectingAndConnectedStreamCapacity(delta);
720 206 : if (!client.hasHandshakeCompleted()) {
721 173 : connecting_stream_capacity_ += delta;
722 173 : }
723 206 : }
724 :
725 0 : void ConnPoolImplBase::onUpstreamReadyForEarlyData(ActiveClient& client) {
726 0 : ASSERT(!client.hasHandshakeCompleted() && client.readyForStream());
727 : // Check pending streams backward for safe request.
728 : // Note that this is a O(n) search, but the expected size of pending_streams_ should be small. If
729 : // this becomes a problem, we could split pending_streams_ into 2 lists.
730 0 : auto it = pending_streams_.end();
731 0 : if (it == pending_streams_.begin()) {
732 0 : return;
733 0 : }
734 0 : --it;
735 0 : while (client.currentUnusedCapacity() > 0) {
736 0 : PendingStream& stream = **it;
737 0 : bool stop_iteration{false};
738 0 : if (it != pending_streams_.begin()) {
739 0 : --it;
740 0 : } else {
741 0 : stop_iteration = true;
742 0 : }
743 :
744 0 : if (stream.can_send_early_data_) {
745 0 : ENVOY_CONN_LOG(debug, "creating stream for early data.", client);
746 0 : attachStreamToClient(client, stream.context());
747 0 : state_.decrPendingStreams(1);
748 0 : stream.removeFromList(pending_streams_);
749 0 : }
750 0 : if (stop_iteration) {
751 0 : return;
752 0 : }
753 0 : }
754 0 : }
755 :
756 : namespace {
757 : // Translate zero to UINT64_MAX so that the zero/unlimited case doesn't
758 : // have to be handled specially.
759 519 : uint32_t translateZeroToUnlimited(uint32_t limit) {
760 519 : return (limit != 0) ? limit : std::numeric_limits<uint32_t>::max();
761 519 : }
762 : } // namespace
763 :
764 : ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
765 : uint32_t concurrent_stream_limit)
766 : : ActiveClient(parent, lifetime_stream_limit, concurrent_stream_limit,
767 0 : concurrent_stream_limit) {}
768 :
769 : ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
770 : uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit)
771 : : parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
772 : configured_stream_limit_(translateZeroToUnlimited(effective_concurrent_streams)),
773 : concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
774 173 : connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
775 173 : conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
776 173 : parent_.host()->cluster().trafficStats()->upstream_cx_connect_ms_,
777 173 : parent_.dispatcher().timeSource());
778 173 : conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
779 173 : parent_.host()->cluster().trafficStats()->upstream_cx_length_ms_,
780 173 : parent_.dispatcher().timeSource());
781 173 : connect_timer_->enableTimer(parent_.host()->cluster().connectTimeout());
782 173 : parent_.host()->stats().cx_total_.inc();
783 173 : parent_.host()->stats().cx_active_.inc();
784 173 : parent_.host()->cluster().trafficStats()->upstream_cx_total_.inc();
785 173 : parent_.host()->cluster().trafficStats()->upstream_cx_active_.inc();
786 173 : parent_.host()->cluster().resourceManager(parent_.priority()).connections().inc();
787 173 : }
788 :
789 173 : ActiveClient::~ActiveClient() { releaseResourcesBase(); }
790 :
791 346 : void ActiveClient::releaseResourcesBase() {
792 346 : if (!resources_released_) {
793 173 : resources_released_ = true;
794 :
795 173 : conn_length_->complete();
796 :
797 173 : parent_.host()->cluster().trafficStats()->upstream_cx_active_.dec();
798 173 : parent_.host()->stats().cx_active_.dec();
799 173 : parent_.host()->cluster().resourceManager(parent_.priority()).connections().dec();
800 173 : }
801 346 : }
802 :
803 0 : void ActiveClient::onConnectTimeout() {
804 0 : ENVOY_CONN_LOG(debug, "connect timeout", *this);
805 0 : parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
806 0 : timed_out_ = true;
807 0 : close();
808 0 : }
809 :
810 0 : void ActiveClient::onConnectionDurationTimeout() {
811 0 : if (!hasHandshakeCompleted()) {
812 : // The connection duration timer should only have started after we were connected.
813 0 : ENVOY_BUG(false, "max connection duration reached while connecting");
814 0 : return;
815 0 : }
816 :
817 0 : if (state_ == ActiveClient::State::Closed) {
818 : // The connection duration timer should have been disabled and reset in onConnectionEvent
819 : // for closing connections.
820 0 : ENVOY_BUG(false, "max connection duration reached while closed");
821 0 : return;
822 0 : }
823 :
824 : // There's nothing to do if the client is draining.
825 0 : if (state_ == ActiveClient::State::Draining) {
826 0 : return;
827 0 : }
828 :
829 0 : ENVOY_CONN_LOG(debug, "max connection duration reached, start draining", *this);
830 0 : parent_.host()->cluster().trafficStats()->upstream_cx_max_duration_reached_.inc();
831 0 : parent_.transitionActiveClientState(*this, Envoy::ConnectionPool::ActiveClient::State::Draining);
832 :
833 : // Close out the draining client if we no longer have active streams.
834 : // We have to do this here because there won't be an onStreamClosed (because there are
835 : // no active streams) to do it for us later.
836 0 : if (numActiveStreams() == 0) {
837 0 : close();
838 0 : }
839 0 : }
840 :
841 0 : void ActiveClient::drain() {
842 0 : if (currentUnusedCapacity() <= 0) {
843 0 : return;
844 0 : }
845 0 : parent_.decrConnectingAndConnectedStreamCapacity(currentUnusedCapacity(), *this);
846 :
847 0 : remaining_streams_ = 0;
848 0 : }
849 :
850 : } // namespace ConnectionPool
851 : } // namespace Envoy
|