1
#include "source/common/conn_pool/conn_pool_base.h"
2

            
3
#include "envoy/server/overload/load_shed_point.h"
4

            
5
#include "source/common/common/assert.h"
6
#include "source/common/common/debug_recursion_checker.h"
7
#include "source/common/network/transport_socket_options_impl.h"
8
#include "source/common/runtime/runtime_features.h"
9
#include "source/common/stats/timespan_impl.h"
10
#include "source/common/upstream/upstream_impl.h"
11

            
12
namespace Envoy {
13
namespace ConnectionPool {
14
namespace {
15
int64_t currentUnusedCapacity(const std::list<ActiveClientPtr>& connecting_clients) {
16
  int64_t ret = 0;
17
  for (const auto& client : connecting_clients) {
18
    ret += client->currentUnusedCapacity();
19
  }
20
  return ret;
21
}
22
} // namespace
23

            
24
1
std::string ConnPoolImplBase::dumpState() const { return fmt::format("State: {}", *this); }
25

            
26
142942
void ConnPoolImplBase::assertCapacityCountsAreCorrect() {
27
142942
  SLOW_ASSERT(static_cast<int64_t>(connecting_stream_capacity_) ==
28
142942
                  currentUnusedCapacity(connecting_clients_) +
29
142942
                      currentUnusedCapacity(early_data_clients_),
30
142942
              dumpState());
31

            
32
  // Note: must include `busy_clients_` because they can have negative current unused capacity,
33
  // which is included in `connecting_and_connected_stream_capacity_`.
34
142942
  SLOW_ASSERT(
35
142942
      connecting_and_connected_stream_capacity_ ==
36
142942
          (static_cast<int64_t>(connecting_stream_capacity_) +
37
142942
           currentUnusedCapacity(ready_clients_) + currentUnusedCapacity(busy_clients_)),
38
142942
      fmt::format(
39
142942
          "{} currentUnusedCapacity(ready_clients_) {}, currentUnusedCapacity(busy_clients_) {}",
40
142942
          *this, currentUnusedCapacity(ready_clients_), currentUnusedCapacity(busy_clients_)));
41

            
42
142942
  SLOW_ASSERT(currentUnusedCapacity(busy_clients_) <= 0, dumpState());
43

            
44
142942
  if (ready_clients_.empty()) {
45
100922
    ENVOY_BUG((connecting_and_connected_stream_capacity_ - connecting_stream_capacity_) <= 0,
46
100922
              dumpState());
47
112588
  } else {
48
42020
    ENVOY_BUG((connecting_and_connected_stream_capacity_ - connecting_stream_capacity_) > 0,
49
42020
              dumpState());
50
42020
  }
51
142942
}
52

            
53
ConnPoolImplBase::ConnPoolImplBase(
54
    Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
55
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
56
    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
57
    Upstream::ClusterConnectivityState& state, Server::OverloadManager& overload_manager)
58
12757
    : host_(host), priority_(priority), dispatcher_(dispatcher), socket_options_(options),
59
12757
      transport_socket_options_(transport_socket_options), cluster_connectivity_state_(state),
60
31852
      upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })),
61
12757
      create_new_connection_load_shed_(overload_manager.getLoadShedPoint(
62
12757
          Server::LoadShedPointName::get().ConnectionPoolNewConnection)) {
63
12757
  ENVOY_LOG_ONCE_IF(trace, create_new_connection_load_shed_ == nullptr,
64
12757
                    "LoadShedPoint envoy.load_shed_points.connection_pool_new_connection is not "
65
12757
                    "found. Is it configured?");
66
12757
}
67

            
68
12755
ConnPoolImplBase::~ConnPoolImplBase() {
69
12755
  ENVOY_BUG(isIdleImpl(), dumpState());
70
12755
  ENVOY_BUG(connecting_stream_capacity_ == 0, dumpState());
71
12755
  ENVOY_BUG(connecting_and_connected_stream_capacity_ == 0, dumpState());
72
12755
}
73

            
74
12317
void ConnPoolImplBase::deleteIsPendingImpl() {
75
12317
  deferred_deleting_ = true;
76
12317
  ENVOY_BUG(isIdleImpl(), dumpState());
77
12317
  ENVOY_BUG(connecting_stream_capacity_ == 0, dumpState());
78
12317
  ENVOY_BUG(connecting_and_connected_stream_capacity_ == 0, dumpState());
79
12317
}
80

            
81
13691
void ConnPoolImplBase::destructAllConnections() {
82
54760
  for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_, &early_data_clients_}) {
83
55048
    while (!list->empty()) {
84
288
      list->front()->close();
85
288
    }
86
54760
  }
87

            
88
  // Make sure all clients are destroyed before we are destroyed.
89
13691
  dispatcher_.clearDeferredDeleteList();
90
13691
}
91

            
92
bool ConnPoolImplBase::shouldConnect(size_t pending_streams, size_t active_streams,
93
                                     int64_t connecting_and_connected_capacity,
94
83070
                                     float preconnect_ratio, bool anticipate_incoming_stream) {
95
  // This is set to true any time global preconnect is being calculated.
96
  // ClusterManagerImpl::maybePreconnect is called directly before a stream is created, so the
97
  // stream must be anticipated.
98
  //
99
  // Also without this, we would never pre-establish a connection as the first
100
  // connection in a pool because pending/active streams could both be 0.
101
83070
  int anticipated_streams = anticipate_incoming_stream ? 1 : 0;
102

            
103
  // The number of streams we want to be provisioned for is the number of
104
  // pending, active, and anticipated streams times the preconnect ratio.
105
  // The number of streams we are (theoretically) provisioned for is the
106
  // connecting stream capacity plus the number of active streams.
107
  //
108
  // If preconnect ratio is not set, it defaults to 1, and this simplifies to the
109
  // legacy value of pending_streams_.size() > connecting_stream_capacity_
110
83070
  return (pending_streams + active_streams + anticipated_streams) * preconnect_ratio >
111
83070
         connecting_and_connected_capacity + active_streams;
112
83070
}
113

            
114
82948
bool ConnPoolImplBase::shouldCreateNewConnection(float global_preconnect_ratio) const {
115
  // If the host is not healthy, don't make it do extra work, especially as
116
  // upstream selection logic may result in bypassing this upstream entirely.
117
  // If an Envoy user wants preconnecting for degraded upstreams this could be
118
  // added later via extending the preconnect config.
119
82948
  if (host_->coarseHealth() != Upstream::Host::Health::Healthy) {
120
61
    return pending_streams_.size() > connecting_stream_capacity_;
121
61
  }
122

            
123
  // Determine if we are trying to prefetch for global preconnect or local preconnect.
124
82887
  if (global_preconnect_ratio != 0) {
125
    // If global preconnecting is on, and this connection is within the global
126
    // preconnect limit, preconnect.
127
    // For global preconnect, we anticipate an incoming stream to this pool, since it is
128
    // prefetching for the next upcoming stream, which will likely be assigned to this pool.
129
    // We may eventually want to track preconnect_attempts to allow more preconnecting for
130
    // heavily weighted upstreams or sticky picks.
131
98
    bool result =
132
98
        shouldConnect(pending_streams_.size(), num_active_streams_,
133
98
                      connecting_and_connected_stream_capacity_, global_preconnect_ratio, true);
134
98
    ENVOY_LOG(trace,
135
98
              "predictive shouldCreateNewConnection returns {} for pending {} active {} "
136
98
              "connecting_and_connected_capacity {} connecting_capacity {} ratio {}",
137
98
              result, pending_streams_.size(), num_active_streams_,
138
98
              connecting_and_connected_stream_capacity_, connecting_stream_capacity_,
139
98
              global_preconnect_ratio);
140
98
    return result;
141
82789
  } else {
142
    // Ensure this local pool has adequate connections for the given load.
143
    //
144
    // Local preconnect does not need to anticipate a stream. It is called as
145
    // new streams are established or torn down and simply attempts to maintain
146
    // the correct ratio of streams and anticipated capacity.
147
82789
    bool result =
148
82789
        shouldConnect(pending_streams_.size(), num_active_streams_,
149
82789
                      connecting_and_connected_stream_capacity_, perUpstreamPreconnectRatio());
150
82789
    ENVOY_LOG(trace,
151
82789
              "per-upstream shouldCreateNewConnection returns {} for pending {} active {} "
152
82789
              "connecting_and_connected_capacity {} connecting_capacity {} ratio {}",
153
82789
              result, pending_streams_.size(), num_active_streams_,
154
82789
              connecting_and_connected_stream_capacity_, connecting_stream_capacity_,
155
82789
              perUpstreamPreconnectRatio());
156
82789
    return result;
157
82789
  }
158
82887
}
159

            
160
83108
float ConnPoolImplBase::perUpstreamPreconnectRatio() const {
161
83108
  return host_->cluster().perUpstreamPreconnectRatio();
162
83108
}
163

            
164
51870
ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
165
51870
  ConnPoolImplBase::ConnectionResult result;
166
  // Somewhat arbitrarily cap the number of connections preconnected due to new
167
  // incoming connections. The preconnect ratio is capped at 3, so in steady
168
  // state, no more than 3 connections should be preconnected. If hosts go
169
  // unhealthy, and connections are not immediately preconnected, it could be that
170
  // many connections are desired when the host becomes healthy again, but
171
  // overwhelming it with connections is not desirable.
172
82846
  for (int i = 0; i < 3; ++i) {
173
82846
    result = tryCreateNewConnection();
174
82846
    if (result != ConnectionResult::CreatedNewConnection) {
175
51870
      break;
176
51870
    }
177
82846
  }
178
51870
  ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection,
179
51870
         dumpState());
180
51870
  return result;
181
51870
}
182

            
183
ConnPoolImplBase::ConnectionResult
184
82948
ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
185
  // There are already enough Connecting connections for the number of queued streams.
186
82948
  if (!shouldCreateNewConnection(global_preconnect_ratio)) {
187
51792
    return ConnectionResult::ShouldNotConnect;
188
51792
  }
189
31156
  ENVOY_LOG(trace, "creating new preconnect connection");
190

            
191
  // Drop new connection attempts if the load shed point indicates overload.
192
31156
  if (create_new_connection_load_shed_) {
193
8
    if (create_new_connection_load_shed_->shouldShedLoad()) {
194
4
      return ConnectionResult::LoadShed;
195
4
    }
196
8
  }
197

            
198
31152
  const bool can_create_connection = host_->canCreateConnection(priority_);
199

            
200
31152
  if (!can_create_connection) {
201
77
    host_->cluster().trafficStats()->upstream_cx_overflow_.inc();
202
77
  }
203
  // If we are at the connection circuit-breaker limit due to other upstreams having
204
  // too many open connections, and this upstream has no connections, always create one, to
205
  // prevent pending streams being queued to this upstream with no way to be processed.
206
31152
  if (can_create_connection || (ready_clients_.empty() && busy_clients_.empty() &&
207
31122
                                connecting_clients_.empty() && early_data_clients_.empty())) {
208
31122
    ENVOY_LOG(debug, "creating a new connection (connecting={})", connecting_clients_.size());
209
31122
    ActiveClientPtr client = instantiateActiveClient();
210
31122
    if (client.get() == nullptr) {
211
3
      ENVOY_LOG(trace, "connection creation failed");
212
3
      return ConnectionResult::FailedToCreateConnection;
213
3
    }
214
31119
    ASSERT(client->state() == ActiveClient::State::Connecting, dumpState());
215
31119
    ENVOY_BUG(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
216
31119
                  static_cast<uint64_t>(client->currentUnusedCapacity()),
217
31119
              dumpState());
218
31119
    ASSERT(client->real_host_description_);
219
    // Increase the connecting capacity to reflect the streams this connection can serve.
220
31119
    incrConnectingAndConnectedStreamCapacity(client->currentUnusedCapacity(), *client);
221
31119
    LinkedList::moveIntoList(std::move(client), owningList(client->state()));
222
31119
    assertCapacityCountsAreCorrect();
223
31119
    return can_create_connection ? ConnectionResult::CreatedNewConnection
224
31119
                                 : ConnectionResult::CreatedButRateLimited;
225
31122
  } else {
226
30
    ENVOY_LOG(trace, "not creating a new connection: connection constrained");
227
30
    return ConnectionResult::NoConnectionRateLimited;
228
30
  }
229
31152
}
230

            
231
void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
232
48994
                                            AttachContext& context) {
233
48994
  ASSERT(client.readyForStream(), dumpState());
234

            
235
48994
  Upstream::ClusterTrafficStats& traffic_stats = *host_->cluster().trafficStats();
236
48994
  if (client.state() == Envoy::ConnectionPool::ActiveClient::State::ReadyForEarlyData) {
237
23
    traffic_stats.upstream_rq_0rtt_.inc();
238
23
  }
239

            
240
48994
  if (enforceMaxRequests() && !host_->cluster().resourceManager(priority_).requests().canCreate()) {
241
25
    ENVOY_LOG(debug, "max streams overflow");
242
25
    onPoolFailure(client.real_host_description_, absl::string_view(),
243
25
                  ConnectionPool::PoolFailureReason::Overflow, context);
244
25
    traffic_stats.upstream_rq_pending_overflow_.inc();
245
25
    return;
246
25
  }
247
48969
  ENVOY_CONN_LOG(debug, "creating stream", client);
248

            
249
  // Latch capacity before updating remaining streams.
250
48969
  uint64_t capacity = client.currentUnusedCapacity();
251
48969
  client.remaining_streams_--;
252
48969
  if (client.remaining_streams_ == 0) {
253
28
    ENVOY_CONN_LOG(debug, "maximum streams per connection, start draining", client);
254
28
    traffic_stats.upstream_cx_max_requests_.inc();
255
28
    transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Draining);
256
48941
  } else if (capacity == 1) {
257
    // As soon as the new stream is created, the client will be maxed out.
258
27722
    transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Busy);
259
27722
  }
260

            
261
  // Decrement the capacity, as there's one less stream available for serving.
262
  // For HTTP/3, the capacity is updated in newStreamEncoder.
263
48969
  if (trackStreamCapacity()) {
264
47519
    decrConnectingAndConnectedStreamCapacity(1, client);
265
47519
  }
266
  // Track the new active stream.
267
48969
  cluster_connectivity_state_.incrActiveStreams(1);
268
48969
  num_active_streams_++;
269
48969
  host_->stats().rq_total_.inc();
270
48969
  host_->stats().rq_active_.inc();
271
48969
  traffic_stats.upstream_rq_total_.inc();
272
48969
  traffic_stats.upstream_rq_active_.inc();
273
48969
  host_->cluster().resourceManager(priority_).requests().inc();
274

            
275
48969
  onPoolReady(client, context);
276
48969
}
277

            
278
void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& client,
279
48969
                                      bool delay_attaching_stream) {
280
48969
  ENVOY_CONN_LOG(
281
48969
      debug, "destroying stream: {} active remaining, readyForStream {}, currentUnusedCapacity {}",
282
48969
      client, client.numActiveStreams(), client.readyForStream(), client.currentUnusedCapacity());
283
48969
  ASSERT(num_active_streams_ > 0, dumpState());
284
48969
  cluster_connectivity_state_.decrActiveStreams(1);
285
48969
  num_active_streams_--;
286
48969
  host_->stats().rq_active_.dec();
287
48969
  host_->cluster().trafficStats()->upstream_rq_active_.dec();
288
48969
  host_->cluster().resourceManager(priority_).requests().dec();
289
  // We don't update the capacity for HTTP/3 as the stream count should only
290
  // increase when a MAX_STREAMS frame is received.
291
48969
  if (trackStreamCapacity()) {
292
    // If the effective client capacity was limited by concurrency, increase connected capacity.
293
47519
    bool limited_by_concurrency =
294
47519
        client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1;
295
    // The capacity calculated by concurrency could be negative if a SETTINGS frame lowered the
296
    // number of allowed streams. In this case, connecting_and_connected_stream_capacity_ can be
297
    // negative, and effective client capacity was still limited by concurrency. Compare
298
    // client.concurrent_stream_limit_ and client.numActiveStreams() directly to avoid overflow.
299
47519
    bool negative_capacity = client.concurrent_stream_limit_ < client.numActiveStreams() + 1;
300
47519
    if (negative_capacity || limited_by_concurrency) {
301
44482
      incrConnectingAndConnectedStreamCapacity(1, client);
302
44482
    }
303
47519
  }
304
48969
  if (client.state() == ActiveClient::State::Draining && client.numActiveStreams() == 0) {
305
    // Close out the draining client if we no longer have active streams.
306
166
    client.close();
307
48803
  } else if (client.state() == ActiveClient::State::Busy && client.currentUnusedCapacity() > 0) {
308
24716
    if (!client.hasHandshakeCompleted()) {
309
1
      transitionActiveClientState(client, ActiveClient::State::ReadyForEarlyData);
310
1
      if (!delay_attaching_stream) {
311
1
        onUpstreamReadyForEarlyData(client);
312
1
      }
313
24715
    } else {
314
24715
      transitionActiveClientState(client, ActiveClient::State::Ready);
315
24715
      if (!delay_attaching_stream) {
316
61
        onUpstreamReady();
317
61
      }
318
24715
    }
319
24716
  }
320
48969
}
321

            
322
ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
323
49831
                                                             bool can_send_early_data) {
324
49831
  ASSERT(!is_draining_for_deletion_, dumpState());
325
49831
  ASSERT(!deferred_deleting_, dumpState());
326
49831
  assertCapacityCountsAreCorrect();
327

            
328
49831
  if (!ready_clients_.empty()) {
329
17826
    ActiveClient& client = *ready_clients_.front();
330
17826
    ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
331
17826
    attachStreamToClient(client, context);
332
    // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
333
17826
    tryCreateNewConnections();
334
17826
    return nullptr;
335
17826
  }
336

            
337
32005
  if (can_send_early_data && !early_data_clients_.empty()) {
338
2
    ActiveClient& client = *early_data_clients_.front();
339
2
    ENVOY_CONN_LOG(debug, "using existing early data ready connection", client);
340
2
    attachStreamToClient(client, context);
341
    // Even if there's an available client, we may want to preconnect to handle the next
342
    // incoming stream.
343
2
    tryCreateNewConnections();
344
2
    return nullptr;
345
2
  }
346

            
347
32003
  if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
348
9
    ENVOY_LOG(debug, "max pending streams overflow");
349
9
    onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
350
9
                  context);
351
9
    host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
352
9
    return nullptr;
353
9
  }
354

            
355
31994
  ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
356
31994
  ENVOY_LOG(debug, "trying to create new connection");
357
31994
  ENVOY_LOG(trace, fmt::format("{}", *this));
358

            
359
31994
  auto old_capacity = connecting_stream_capacity_;
360
  // This must come after newPendingStream() because this function uses the
361
  // length of pending_streams_ to determine if a new connection is needed.
362
31994
  const ConnectionResult result = tryCreateNewConnections();
363
  // If there is not enough connecting capacity, the only reason to not
364
  // increase capacity is if the connection limits are exceeded or load shed is
365
  // triggered.
366
31994
  ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
367
31994
                connecting_stream_capacity_ > old_capacity ||
368
31994
                (result == ConnectionResult::NoConnectionRateLimited ||
369
31994
                 result == ConnectionResult::FailedToCreateConnection ||
370
31994
                 result == ConnectionResult::LoadShed),
371
31994
            fmt::format("Failed to create expected connection: {}", *this));
372
31994
  if (result == ConnectionResult::FailedToCreateConnection) {
373
    // This currently only happens for HTTP/3 if secrets aren't yet loaded.
374
    // Trigger connection failure.
375
3
    pending->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
376
3
    onPoolFailure(nullptr, absl::string_view(),
377
3
                  ConnectionPool::PoolFailureReason::LocalConnectionFailure, context);
378
3
    return nullptr;
379
31991
  } else if (result == ConnectionResult::LoadShed) {
380
4
    pending->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
381
4
    onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
382
4
                  context);
383
4
    return nullptr;
384
4
  }
385
31987
  return pending;
386
31994
}
387

            
388
102
bool ConnPoolImplBase::maybePreconnectImpl(float global_preconnect_ratio) {
389
102
  ASSERT(!deferred_deleting_, dumpState());
390
102
  return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
391
102
}
392

            
393
24453
void ConnPoolImplBase::scheduleOnUpstreamReady() {
394
24453
  upstream_ready_cb_->scheduleCallbackCurrentIteration();
395
24453
}
396

            
397
54493
void ConnPoolImplBase::onUpstreamReady() {
398
85638
  while (!pending_streams_.empty() && !ready_clients_.empty()) {
399
31145
    ActiveClientPtr& client = ready_clients_.front();
400
31145
    ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
401
    // Pending streams are pushed onto the front, so pull from the back.
402
31145
    attachStreamToClient(*client, pending_streams_.back()->context());
403
31145
    cluster_connectivity_state_.decrPendingStreams(1);
404
31145
    pending_streams_.pop_back();
405
31145
  }
406
54493
  if (!pending_streams_.empty()) {
407
1693
    tryCreateNewConnections();
408
1693
  }
409
54493
}
410

            
411
229485
std::list<ActiveClientPtr>& ConnPoolImplBase::owningList(ActiveClient::State state) {
412
229485
  switch (state) {
413
62357
  case ActiveClient::State::Connecting:
414
62357
    return connecting_clients_;
415
107
  case ActiveClient::State::ReadyForEarlyData:
416
107
    return early_data_clients_;
417
111127
  case ActiveClient::State::Ready:
418
111127
    return ready_clients_;
419
55480
  case ActiveClient::State::Busy:
420
55894
  case ActiveClient::State::Draining:
421
55894
    return busy_clients_;
422
  case ActiveClient::State::Closed:
423
    break; // Fall through to PANIC.
424
229485
  }
425
  PANIC("unexpected");
426
}
427

            
428
void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,
429
83560
                                                   ActiveClient::State new_state) {
430
83560
  auto& old_list = owningList(client.state());
431
83560
  auto& new_list = owningList(new_state);
432
83560
  client.setState(new_state);
433

            
434
  // old_list and new_list can be equal when transitioning from Busy to Draining.
435
  //
436
  // The documentation for list.splice() (which is what moveBetweenLists() calls) is
437
  // unclear whether it is allowed for src and dst to be the same, so check here
438
  // since it is a no-op anyways.
439
83560
  if (&old_list != &new_list) {
440
83548
    client.moveBetweenLists(old_list, new_list);
441
83548
  }
442
83560
}
443

            
444
12565
void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }
445

            
446
248
void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
447
248
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
448

            
449
  // Create a separate list of elements to close to avoid mutate-while-iterating problems.
450
248
  std::list<ActiveClient*> to_close;
451

            
452
16227
  for (auto& client : ready_clients_) {
453
16184
    if (client->numActiveStreams() == 0) {
454
16174
      to_close.push_back(client.get());
455
16174
    }
456
16184
  }
457

            
458
248
  if (pending_streams_.empty()) {
459
231
    for (auto& client : connecting_clients_) {
460
16
      to_close.push_back(client.get());
461
16
    }
462
231
    for (auto& client : early_data_clients_) {
463
2
      if (client->numActiveStreams() == 0) {
464
1
        to_close.push_back(client.get());
465
1
      }
466
2
    }
467
231
  }
468

            
469
16229
  for (auto& entry : to_close) {
470
16191
    ENVOY_LOG_EVENT(debug, "closing_idle_client", "closing idle client {} for cluster {}",
471
16191
                    entry->id(), host_->cluster().name());
472
16191
    entry->close();
473
16191
  }
474
248
}
475

            
476
194
void ConnPoolImplBase::drainClients(std::list<ActiveClientPtr>& clients) {
477
198
  while (!clients.empty()) {
478
4
    ASSERT(clients.front()->numActiveStreams() > 0u, dumpState());
479
4
    ENVOY_LOG_EVENT(
480
4
        debug, "draining_non_idle_client", "draining {} client {} for cluster {}",
481
4
        (clients.front()->state() == ActiveClient::State::Ready ? "ready" : "early data"),
482
4
        clients.front()->id(), host_->cluster().name());
483
4
    transitionActiveClientState(*clients.front(), ActiveClient::State::Draining);
484
4
  }
485
194
}
486

            
487
220
void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) {
488
220
  if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
489
121
    is_draining_for_deletion_ = true;
490
121
    checkForIdleAndCloseIdleConnsIfDraining();
491
121
    return;
492
121
  }
493
99
  closeIdleConnectionsForDrainingPool();
494
  // closeIdleConnectionsForDrainingPool() closes all connections in ready_clients_ with no active
495
  // streams and if no pending streams, all connections in early_data_clients_ with no active
496
  // streams as well, so all remaining entries in ready_clients_ are serving streams. Move them and
497
  // all entries in busy_clients_ to draining.
498
99
  if (pending_streams_.empty()) {
499
    // The remaining early data clients are non-idle.
500
95
    drainClients(early_data_clients_);
501
95
  }
502

            
503
99
  drainClients(ready_clients_);
504

            
505
  // Changing busy_clients_ to Draining does not move them between lists,
506
  // so use a for-loop since the list is not mutated.
507
99
  ASSERT(&owningList(ActiveClient::State::Draining) == &busy_clients_, dumpState());
508
99
  for (auto& busy_client : busy_clients_) {
509
19
    if (busy_client->state() == ActiveClient::State::Draining) {
510
9
      continue;
511
9
    }
512
10
    ENVOY_LOG_EVENT(debug, "draining_busy_client", "draining busy client {} for cluster {}",
513
10
                    busy_client->id(), host_->cluster().name());
514
10
    transitionActiveClientState(*busy_client, ActiveClient::State::Draining);
515
10
  }
516
99
}
517

            
518
133829
bool ConnPoolImplBase::isIdleImpl() const {
519
133829
  return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
520
133829
         connecting_clients_.empty() && early_data_clients_.empty();
521
133829
}
522

            
523
/*
524
  This method may be invoked once or twice.
525
  It is called first time in ConnPoolImplBase::onConnectionEvent for Local/RemoteClose events.
526
  The second time it is called from Envoy::Tcp::ActiveTcpClient::~ActiveTcpClient via
527
  ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining.
528

            
529
  The logic must be constructed in such way that the method is called once or twice.
530
  See PR 30807 description for explanation why idle callbacks are deleted after being called.
531
*/
532
108677
void ConnPoolImplBase::checkForIdleAndNotify() {
533
108677
  if (isIdleImpl()) {
534
13373
    ENVOY_LOG(debug, "invoking {} idle callback(s) - is_draining_for_deletion_={}",
535
13373
              idle_callbacks_.size(), is_draining_for_deletion_);
536
13373
    for (const Instance::IdleCb& cb : idle_callbacks_) {
537
12545
      cb();
538
12545
    }
539
    // Clear callbacks, so they are not executed if checkForIdleAndNotify is called again.
540
13373
    idle_callbacks_.clear();
541
13373
  }
542
108677
}
543

            
544
77558
void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() {
545
77558
  if (is_draining_for_deletion_) {
546
149
    closeIdleConnectionsForDrainingPool();
547
149
  }
548

            
549
77558
  checkForIdleAndNotify();
550
77558
}
551

            
552
void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
553
61992
                                         Network::ConnectionEvent event) {
554
61992
  switch (event) {
555
11043
  case Network::ConnectionEvent::RemoteClose:
556
31119
  case Network::ConnectionEvent::LocalClose: {
557
31119
    if (client.connect_timer_) {
558
358
      ASSERT(!client.has_handshake_completed_);
559
358
      client.connect_timer_->disableTimer();
560
358
      client.connect_timer_.reset();
561
358
    }
562
31119
    decrConnectingAndConnectedStreamCapacity(client.currentUnusedCapacity(), client);
563

            
564
    // Make sure that onStreamClosed won't double count.
565
31119
    client.remaining_streams_ = 0;
566
    // The client died.
567
31119
    ENVOY_CONN_LOG(debug, "client disconnected, failure reason: {}", client, failure_reason);
568

            
569
31119
    Envoy::Upstream::reportUpstreamCxDestroy(host_, event);
570
31119
    const bool incomplete_stream = client.closingWithIncompleteStream();
571
31119
    if (incomplete_stream) {
572
2731
      Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
573
2731
    }
574

            
575
31119
    if (!client.hasHandshakeCompleted()) {
576
337
      client.has_handshake_completed_ = true;
577
337
      host_->cluster().trafficStats()->upstream_cx_connect_fail_.inc();
578
337
      host_->stats().cx_connect_fail_.inc();
579

            
580
337
      onConnectFailed(client);
581
      // Purge pending streams only if this client doesn't contribute to the local connecting
582
      // stream capacity. In other words, the rest clients  would be able to handle all the
583
      // pending stream once they are connected.
584
337
      ConnectionPool::PoolFailureReason reason;
585
337
      if (client.timed_out_) {
586
8
        reason = ConnectionPool::PoolFailureReason::Timeout;
587
331
      } else if (event == Network::ConnectionEvent::RemoteClose) {
588
242
        reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure;
589
297
      } else {
590
87
        reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure;
591
87
      }
592

            
593
      // Raw connect failures should never happen under normal circumstances. If we have an
594
      // upstream that is behaving badly, streams can get stuck here in the pending state. If we
595
      // see a connect failure, we purge all pending streams so that calling code can determine
596
      // what to do with the stream.
597
      // NOTE: We move the existing pending streams to a temporary list. This is done so that
598
      //       if retry logic submits a new stream to the pool, we don't fail it inline.
599
337
      purgePendingStreams(client.real_host_description_, failure_reason, reason);
600
      // See if we should preconnect based on active connections.
601
337
      if (!is_draining_for_deletion_) {
602
327
        tryCreateNewConnections();
603
327
      }
604
337
    }
605

            
606
    // We need to release our resourceManager() resources before checking below for
607
    // whether we can create a new connection. Normally this would happen when
608
    // client's destructor runs, but this object needs to be deferredDelete'd(), so
609
    // this forces part of its cleanup to happen now.
610
31119
    client.releaseResources();
611

            
612
    // Again, since we know this object is going to be deferredDelete'd(), we take
613
    // this opportunity to disable and reset the connection duration timer so that
614
    // it doesn't trigger while on the deferred delete list. In theory it is safe
615
    // to handle the Closed state in onConnectionDurationTimeout, but we handle
616
    // it here for simplicity and safety anyway.
617
31119
    if (client.connection_duration_timer_) {
618
18
      client.connection_duration_timer_->disableTimer();
619
18
      client.connection_duration_timer_.reset();
620
18
    }
621

            
622
31119
    dispatcher_.deferredDelete(client.removeFromList(owningList(client.state())));
623

            
624
    // Check if the pool transitioned to idle state after removing closed client
625
    // from one of the client tracking lists.
626
    // There is no need to check if other connections are idle in a draining pool
627
    // because the pool will close all idle connection when it is starting to
628
    // drain.
629
    // Trying to close other connections here can lead to deep recursion when
630
    // a large number idle connections are closed at the start of pool drain.
631
    // See CdsIntegrationTest.CdsClusterDownWithLotsOfIdleConnections for an example.
632
31119
    checkForIdleAndNotify();
633

            
634
31119
    client.setState(ActiveClient::State::Closed);
635

            
636
    // If we have pending streams and we just lost a connection we should make a new one.
637
31119
    if (!pending_streams_.empty()) {
638
28
      tryCreateNewConnections();
639
28
    }
640
31119
    break;
641
11043
  }
642
30820
  case Network::ConnectionEvent::Connected: {
643
30820
    ASSERT(client.connect_timer_ != nullptr && !client.has_handshake_completed_);
644
30820
    client.connect_timer_->disableTimer();
645
30820
    client.connect_timer_.reset();
646

            
647
30820
    ENVOY_BUG(connecting_stream_capacity_ >= client.currentUnusedCapacity(), dumpState());
648
30820
    connecting_stream_capacity_ -= client.currentUnusedCapacity();
649
30820
    client.has_handshake_completed_ = true;
650
30820
    client.conn_connect_ms_->complete();
651
30820
    client.conn_connect_ms_.reset();
652
30820
    if (client.state() == ActiveClient::State::Connecting ||
653
30820
        client.state() == ActiveClient::State::ReadyForEarlyData) {
654
30811
      transitionActiveClientState(client,
655
30811
                                  (client.currentUnusedCapacity() > 0 ? ActiveClient::State::Ready
656
30811
                                                                      : ActiveClient::State::Busy));
657
30811
    }
658

            
659
    // Now that the active client is ready, set up a timer for max connection duration.
660
30820
    const absl::optional<std::chrono::milliseconds> max_connection_duration =
661
30820
        client.parent_.host()->cluster().maxConnectionDuration();
662
30820
    if (max_connection_duration.has_value()) {
663
18
      client.connection_duration_timer_ = client.parent_.dispatcher().createTimer(
664
18
          [&client]() { client.onConnectionDurationTimeout(); });
665
18
      client.connection_duration_timer_->enableTimer(max_connection_duration.value());
666
18
    }
667
    // Initialize client read filters
668
30820
    client.initializeReadFilters();
669

            
670
    // At this point, for the mixed ALPN pool, the client may be deleted. Do not
671
    // refer to client after this point.
672
30820
    onConnected(client);
673
30820
    if (client.readyForStream()) {
674
30808
      onUpstreamReady();
675
30808
    }
676
30820
    checkForIdleAndCloseIdleConnsIfDraining();
677
30820
    break;
678
11043
  }
679
53
  case Network::ConnectionEvent::ConnectedZeroRtt: {
680
53
    ENVOY_CONN_LOG(debug, "0-RTT connected with capacity {}", client,
681
53
                   client.currentUnusedCapacity());
682
    // No need to update connecting capacity and connect_timer_ as the client is still connecting.
683
53
    ASSERT(client.state() == ActiveClient::State::Connecting);
684
53
    host()->cluster().trafficStats()->upstream_cx_connect_with_0_rtt_.inc();
685
53
    transitionActiveClientState(client, (client.currentUnusedCapacity() > 0
686
53
                                             ? ActiveClient::State::ReadyForEarlyData
687
53
                                             : ActiveClient::State::Busy));
688
53
    break;
689
11043
  }
690
61992
  }
691
61992
  assertCapacityCountsAreCorrect();
692
61992
}
693

            
694
PendingStream::PendingStream(ConnPoolImplBase& parent, bool can_send_early_data)
695
31994
    : parent_(parent), can_send_early_data_(can_send_early_data) {
696
31994
  Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
697
31994
  traffic_stats.upstream_rq_pending_total_.inc();
698
31994
  traffic_stats.upstream_rq_pending_active_.inc();
699
31994
  parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().inc();
700
31994
}
701

            
702
31994
PendingStream::~PendingStream() {
703
31994
  parent_.host()->cluster().trafficStats()->upstream_rq_pending_active_.dec();
704
31994
  parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().dec();
705
31994
}
706

            
707
540
void PendingStream::cancel(Envoy::ConnectionPool::CancelPolicy policy) {
708
540
  parent_.onPendingStreamCancel(*this, policy);
709
540
}
710

            
711
void ConnPoolImplBase::purgePendingStreams(
712
    const Upstream::HostDescriptionConstSharedPtr& host_description,
713
337
    absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason) {
714
  // NOTE: We move the existing pending streams to a temporary list. This is done so that
715
  //       if retry logic submits a new stream to the pool, we don't fail it inline.
716
337
  cluster_connectivity_state_.decrPendingStreams(pending_streams_.size());
717
337
  pending_streams_to_purge_ = std::move(pending_streams_);
718
625
  while (!pending_streams_to_purge_.empty()) {
719
288
    PendingStreamPtr stream =
720
288
        pending_streams_to_purge_.front()->removeFromList(pending_streams_to_purge_);
721
288
    host_->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
722
288
    onPoolFailure(host_description, failure_reason, reason, stream->context());
723
288
  }
724
337
}
725

            
726
30
bool ConnPoolImplBase::connectingConnectionIsExcess(const ActiveClient& client) const {
727
30
  ASSERT(!client.hasHandshakeCompleted());
728
30
  ENVOY_BUG(connecting_stream_capacity_ >= client.currentUnusedCapacity(), dumpState());
729
  // If perUpstreamPreconnectRatio is one, this simplifies to checking if there would still be
730
  // sufficient connecting stream capacity to serve all pending streams if the most recent client
731
  // were removed from the picture.
732
  //
733
  // If preconnect ratio is set, it also factors in the anticipated load based on both queued
734
  // streams and active streams, and makes sure the connecting capacity would still be sufficient to
735
  // serve that even with the most recent client removed.
736
30
  return (pending_streams_.size() + num_active_streams_) * perUpstreamPreconnectRatio() <=
737
30
         (connecting_stream_capacity_ - client.currentUnusedCapacity() + num_active_streams_);
738
30
}
739

            
740
void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
741
540
                                             Envoy::ConnectionPool::CancelPolicy policy) {
742
540
  ENVOY_LOG(debug, "cancelling pending stream");
743
540
  if (!pending_streams_to_purge_.empty()) {
744
    // If pending_streams_to_purge_ is not empty, it means that we are called from
745
    // with-in a onPoolFailure callback invoked in purgePendingStreams (i.e. purgePendingStreams
746
    // is down in the call stack). Remove this stream from the list as it is cancelled,
747
    // and there is no need to call its onPoolFailure callback.
748
1
    stream.removeFromList(pending_streams_to_purge_);
749
539
  } else {
750
539
    cluster_connectivity_state_.decrPendingStreams(1);
751
539
    stream.removeFromList(pending_streams_);
752
539
  }
753
540
  if (policy == Envoy::ConnectionPool::CancelPolicy::CloseExcess) {
754
37
    if (!connecting_clients_.empty() &&
755
37
        connectingConnectionIsExcess(*connecting_clients_.front())) {
756
22
      auto& client = *connecting_clients_.front();
757
22
      transitionActiveClientState(client, ActiveClient::State::Draining);
758
22
      client.close();
759
28
    } else if (!early_data_clients_.empty()) {
760
1
      for (ActiveClientPtr& client : early_data_clients_) {
761
1
        if (client->numActiveStreams() == 0) {
762
          // Find an idle early data client and check if it is excess.
763
1
          if (connectingConnectionIsExcess(*client)) {
764
            // Close the client after the for loop avoid messing up with iterator.
765
1
            transitionActiveClientState(*client, ActiveClient::State::Draining);
766
1
            client->close();
767
1
          }
768
1
          break;
769
1
        }
770
1
      }
771
1
    }
772
37
  }
773

            
774
540
  host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
775
540
  checkForIdleAndCloseIdleConnsIfDraining();
776
540
}
777

            
778
void ConnPoolImplBase::decrConnectingAndConnectedStreamCapacity(uint32_t delta,
779
80269
                                                                ActiveClient& client) {
780
80269
  decrClusterStreamCapacity(delta);
781

            
782
80269
  if (!client.hasHandshakeCompleted()) {
783
    // If still doing handshake, it is contributing to the local connecting stream capacity. Update
784
    // the capacity as well.
785
408
    ENVOY_BUG(connecting_stream_capacity_ >= delta, dumpState());
786
408
    connecting_stream_capacity_ -= delta;
787
408
  }
788
80269
}
789

            
790
void ConnPoolImplBase::incrConnectingAndConnectedStreamCapacity(uint32_t delta,
791
75639
                                                                ActiveClient& client) {
792
75639
  incrClusterStreamCapacity(delta);
793

            
794
75639
  if (!client.hasHandshakeCompleted()) {
795
31121
    connecting_stream_capacity_ += delta;
796
31121
  }
797
75639
}
798

            
799
53
void ConnPoolImplBase::onUpstreamReadyForEarlyData(ActiveClient& client) {
800
53
  ASSERT(!client.hasHandshakeCompleted() && client.readyForStream());
801
  // Check pending streams backward for safe request.
802
  // Note that this is a O(n) search, but the expected size of pending_streams_ should be small. If
803
  // this becomes a problem, we could split pending_streams_ into 2 lists.
804
53
  auto it = pending_streams_.end();
805
53
  if (it == pending_streams_.begin()) {
806
3
    return;
807
3
  }
808
50
  --it;
809
159
  while (client.currentUnusedCapacity() > 0) {
810
159
    PendingStream& stream = **it;
811
159
    bool stop_iteration{false};
812
159
    if (it != pending_streams_.begin()) {
813
109
      --it;
814
150
    } else {
815
50
      stop_iteration = true;
816
50
    }
817

            
818
159
    if (stream.can_send_early_data_) {
819
21
      ENVOY_CONN_LOG(debug, "creating stream for early data.", client);
820
21
      attachStreamToClient(client, stream.context());
821
21
      cluster_connectivity_state_.decrPendingStreams(1);
822
21
      stream.removeFromList(pending_streams_);
823
21
    }
824
159
    if (stop_iteration) {
825
50
      return;
826
50
    }
827
159
  }
828
50
}
829

            
830
namespace {
831
// Translate zero to UINT32_MAX so that the zero/unlimited case doesn't
832
// have to be handled specially.
833
93537
uint32_t translateZeroToUnlimited(uint32_t limit) {
834
93537
  return (limit != 0) ? limit : std::numeric_limits<uint32_t>::max();
835
93537
}
836
} // namespace
837

            
838
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
839
                           uint32_t concurrent_stream_limit)
840
2195
    : ActiveClient(parent, lifetime_stream_limit, concurrent_stream_limit,
841
2195
                   concurrent_stream_limit) {}
842

            
843
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
844
                           uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit)
845
31179
    : parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
846
31179
      configured_stream_limit_(translateZeroToUnlimited(effective_concurrent_streams)),
847
31179
      concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
848
31179
      connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
849
31179
  conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
850
31179
      parent_.host()->cluster().trafficStats()->upstream_cx_connect_ms_,
851
31179
      parent_.dispatcher().timeSource());
852
31179
  conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
853
31179
      parent_.host()->cluster().trafficStats()->upstream_cx_length_ms_,
854
31179
      parent_.dispatcher().timeSource());
855
31179
  connect_timer_->enableTimer(parent_.host()->cluster().connectTimeout());
856
31179
  parent_.host()->stats().cx_total_.inc();
857
31179
  parent_.host()->stats().cx_active_.inc();
858
31179
  parent_.host()->cluster().trafficStats()->upstream_cx_total_.inc();
859
31179
  parent_.host()->cluster().trafficStats()->upstream_cx_active_.inc();
860
31179
  parent_.host()->cluster().resourceManager(parent_.priority()).connections().inc();
861
31179
}
862

            
863
31179
ActiveClient::~ActiveClient() { releaseResourcesBase(); }
864

            
865
62298
void ActiveClient::releaseResourcesBase() {
866
62298
  if (!resources_released_) {
867
31179
    resources_released_ = true;
868

            
869
31179
    conn_length_->complete();
870

            
871
31179
    parent_.host()->cluster().trafficStats()->upstream_cx_active_.dec();
872
31179
    parent_.host()->stats().cx_active_.dec();
873
31179
    parent_.host()->cluster().resourceManager(parent_.priority()).connections().dec();
874
31179
  }
875
62298
}
876

            
877
8
void ActiveClient::onConnectTimeout() {
878
8
  ENVOY_CONN_LOG(debug, "connect timeout", *this);
879
8
  parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
880
8
  timed_out_ = true;
881
8
  close();
882
8
}
883

            
884
16
void ActiveClient::onConnectionDurationTimeout() {
885
16
  if (!hasHandshakeCompleted()) {
886
    // The connection duration timer should only have started after we were connected.
887
1
    ENVOY_BUG(false, "max connection duration reached while connecting");
888
1
    return;
889
1
  }
890

            
891
15
  if (state_ == ActiveClient::State::Closed) {
892
    // The connection duration timer should have been disabled and reset in onConnectionEvent
893
    // for closing connections.
894
1
    ENVOY_BUG(false, "max connection duration reached while closed");
895
1
    return;
896
1
  }
897

            
898
  // There's nothing to do if the client is draining.
899
14
  if (state_ == ActiveClient::State::Draining) {
900
1
    return;
901
1
  }
902

            
903
13
  ENVOY_CONN_LOG(debug, "max connection duration reached, start draining", *this);
904
13
  parent_.host()->cluster().trafficStats()->upstream_cx_max_duration_reached_.inc();
905
13
  parent_.transitionActiveClientState(*this, Envoy::ConnectionPool::ActiveClient::State::Draining);
906

            
907
  // Close out the draining client if we no longer have active streams.
908
  // We have to do this here because there won't be an onStreamClosed (because there are
909
  // no active streams) to do it for us later.
910
13
  if (numActiveStreams() == 0) {
911
12
    close();
912
12
  }
913
13
}
914

            
915
207
void ActiveClient::drain() {
916
207
  const int64_t unused = currentUnusedCapacity();
917

            
918
  // Remove draining client's capacity from the pool.
919
  //
920
  // The code that adds capacity back to the pool in `onStreamClosed` will only add it back if
921
  // it sees the connection as currently limited by concurrent capacity, not total lifetime streams.
922
  // Setting this to zero ensures that the `limited_by_concurrency` check does not detect this
923
  // connection as limited for that reason, because it is now being marked as having zero remaining
924
  // lifetime requests.
925
207
  remaining_streams_ = 0;
926

            
927
207
  if (unused > 0) {
928
167
    parent_.decrConnectingAndConnectedStreamCapacity(unused, *this);
929
167
  }
930
207
}
931

            
932
} // namespace ConnectionPool
933
} // namespace Envoy