1
#include "source/common/conn_pool/conn_pool_base.h"
2

            
3
#include "envoy/server/overload/load_shed_point.h"
4

            
5
#include "source/common/common/assert.h"
6
#include "source/common/common/debug_recursion_checker.h"
7
#include "source/common/network/transport_socket_options_impl.h"
8
#include "source/common/runtime/runtime_features.h"
9
#include "source/common/stats/timespan_impl.h"
10
#include "source/common/upstream/upstream_impl.h"
11

            
12
namespace Envoy {
13
namespace ConnectionPool {
14
namespace {
15
int64_t currentUnusedCapacity(const std::list<ActiveClientPtr>& connecting_clients) {
16
  int64_t ret = 0;
17
  for (const auto& client : connecting_clients) {
18
    ret += client->currentUnusedCapacity();
19
  }
20
  return ret;
21
}
22
} // namespace
23

            
24
1
std::string ConnPoolImplBase::dumpState() const { return fmt::format("State: {}", *this); }
25

            
26
142558
void ConnPoolImplBase::assertCapacityCountsAreCorrect() {
27
142558
  SLOW_ASSERT(static_cast<int64_t>(connecting_stream_capacity_) ==
28
142558
                  currentUnusedCapacity(connecting_clients_) +
29
142558
                      currentUnusedCapacity(early_data_clients_),
30
142558
              dumpState());
31

            
32
  // Note: must include `busy_clients_` because they can have negative current unused capacity,
33
  // which is included in `connecting_and_connected_stream_capacity_`.
34
142558
  SLOW_ASSERT(
35
142558
      connecting_and_connected_stream_capacity_ ==
36
142558
          (static_cast<int64_t>(connecting_stream_capacity_) +
37
142558
           currentUnusedCapacity(ready_clients_) + currentUnusedCapacity(busy_clients_)),
38
142558
      fmt::format(
39
142558
          "{} currentUnusedCapacity(ready_clients_) {}, currentUnusedCapacity(busy_clients_) {}",
40
142558
          *this, currentUnusedCapacity(ready_clients_), currentUnusedCapacity(busy_clients_)));
41

            
42
142558
  SLOW_ASSERT(currentUnusedCapacity(busy_clients_) <= 0, dumpState());
43

            
44
142558
  if (ready_clients_.empty()) {
45
100661
    ENVOY_BUG((connecting_and_connected_stream_capacity_ - connecting_stream_capacity_) <= 0,
46
100661
              dumpState());
47
112457
  } else {
48
41897
    ENVOY_BUG((connecting_and_connected_stream_capacity_ - connecting_stream_capacity_) > 0,
49
41897
              dumpState());
50
41897
  }
51
142558
}
52

            
53
ConnPoolImplBase::ConnPoolImplBase(
54
    Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
55
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
56
    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
57
    Upstream::ClusterConnectivityState& state, Server::OverloadManager& overload_manager)
58
12689
    : host_(host), priority_(priority), dispatcher_(dispatcher), socket_options_(options),
59
12689
      transport_socket_options_(transport_socket_options), cluster_connectivity_state_(state),
60
31688
      upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })),
61
12689
      create_new_connection_load_shed_(overload_manager.getLoadShedPoint(
62
12689
          Server::LoadShedPointName::get().ConnectionPoolNewConnection)) {
63
12689
  ENVOY_LOG_ONCE_IF(trace, create_new_connection_load_shed_ == nullptr,
64
12689
                    "LoadShedPoint envoy.load_shed_points.connection_pool_new_connection is not "
65
12689
                    "found. Is it configured?");
66
12689
}
67

            
68
12689
ConnPoolImplBase::~ConnPoolImplBase() {
69
12689
  ENVOY_BUG(isIdleImpl(), dumpState());
70
12689
  ENVOY_BUG(connecting_stream_capacity_ == 0, dumpState());
71
12689
  ENVOY_BUG(connecting_and_connected_stream_capacity_ == 0, dumpState());
72
12689
}
73

            
74
12246
void ConnPoolImplBase::deleteIsPendingImpl() {
75
12246
  deferred_deleting_ = true;
76
12246
  ENVOY_BUG(isIdleImpl(), dumpState());
77
12246
  ENVOY_BUG(connecting_stream_capacity_ == 0, dumpState());
78
12246
  ENVOY_BUG(connecting_and_connected_stream_capacity_ == 0, dumpState());
79
12246
}
80

            
81
13623
void ConnPoolImplBase::destructAllConnections() {
82
54492
  for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_, &early_data_clients_}) {
83
54783
    while (!list->empty()) {
84
291
      list->front()->close();
85
291
    }
86
54492
  }
87

            
88
  // Make sure all clients are destroyed before we are destroyed.
89
13623
  dispatcher_.clearDeferredDeleteList();
90
13623
}
91

            
92
bool ConnPoolImplBase::shouldConnect(size_t pending_streams, size_t active_streams,
93
                                     int64_t connecting_and_connected_capacity,
94
82298
                                     float preconnect_ratio, bool anticipate_incoming_stream) {
95
  // This is set to true any time global preconnect is being calculated.
96
  // ClusterManagerImpl::maybePreconnect is called directly before a stream is created, so the
97
  // stream must be anticipated.
98
  //
99
  // Also without this, we would never pre-establish a connection as the first
100
  // connection in a pool because pending/active streams could both be 0.
101
82298
  int anticipated_streams = anticipate_incoming_stream ? 1 : 0;
102

            
103
  // The number of streams we want to be provisioned for is the number of
104
  // pending, active, and anticipated streams times the preconnect ratio.
105
  // The number of streams we are (theoretically) provisioned for is the
106
  // connecting stream capacity plus the number of active streams.
107
  //
108
  // If preconnect ratio is not set, it defaults to 1, and this simplifies to the
109
  // legacy value of pending_streams_.size() > connecting_stream_capacity_
110
82298
  return (pending_streams + active_streams + anticipated_streams) * preconnect_ratio >
111
82298
         connecting_and_connected_capacity + active_streams;
112
82298
}
113

            
114
82181
bool ConnPoolImplBase::shouldCreateNewConnection(float global_preconnect_ratio) const {
115
  // If the host is not healthy, don't make it do extra work, especially as
116
  // upstream selection logic may result in bypassing this upstream entirely.
117
  // If an Envoy user wants preconnecting for degraded upstreams this could be
118
  // added later via extending the preconnect config.
119
82181
  if (host_->coarseHealth() != Upstream::Host::Health::Healthy) {
120
60
    return pending_streams_.size() > connecting_stream_capacity_;
121
60
  }
122

            
123
  // Determine if we are trying to prefetch for global preconnect or local preconnect.
124
82121
  if (global_preconnect_ratio != 0) {
125
    // If global preconnecting is on, and this connection is within the global
126
    // preconnect limit, preconnect.
127
    // For global preconnect, we anticipate an incoming stream to this pool, since it is
128
    // prefetching for the next upcoming stream, which will likely be assigned to this pool.
129
    // We may eventually want to track preconnect_attempts to allow more preconnecting for
130
    // heavily weighted upstreams or sticky picks.
131
95
    bool result =
132
95
        shouldConnect(pending_streams_.size(), num_active_streams_,
133
95
                      connecting_and_connected_stream_capacity_, global_preconnect_ratio, true);
134
95
    ENVOY_LOG(trace,
135
95
              "predictive shouldCreateNewConnection returns {} for pending {} active {} "
136
95
              "connecting_and_connected_capacity {} connecting_capacity {} ratio {}",
137
95
              result, pending_streams_.size(), num_active_streams_,
138
95
              connecting_and_connected_stream_capacity_, connecting_stream_capacity_,
139
95
              global_preconnect_ratio);
140
95
    return result;
141
82026
  } else {
142
    // Ensure this local pool has adequate connections for the given load.
143
    //
144
    // Local preconnect does not need to anticipate a stream. It is called as
145
    // new streams are established or torn down and simply attempts to maintain
146
    // the correct ratio of streams and anticipated capacity.
147
82026
    bool result =
148
82026
        shouldConnect(pending_streams_.size(), num_active_streams_,
149
82026
                      connecting_and_connected_stream_capacity_, perUpstreamPreconnectRatio());
150
82026
    ENVOY_LOG(trace,
151
82026
              "per-upstream shouldCreateNewConnection returns {} for pending {} active {} "
152
82026
              "connecting_and_connected_capacity {} connecting_capacity {} ratio {}",
153
82026
              result, pending_streams_.size(), num_active_streams_,
154
82026
              connecting_and_connected_stream_capacity_, connecting_stream_capacity_,
155
82026
              perUpstreamPreconnectRatio());
156
82026
    return result;
157
82026
  }
158
82121
}
159

            
160
82347
float ConnPoolImplBase::perUpstreamPreconnectRatio() const {
161
82347
  return host_->cluster().perUpstreamPreconnectRatio();
162
82347
}
163

            
164
51171
ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
165
51171
  ConnPoolImplBase::ConnectionResult result;
166
  // Somewhat arbitrarily cap the number of connections preconnected due to new
167
  // incoming connections. The preconnect ratio is capped at 3, so in steady
168
  // state, no more than 3 connections should be preconnected. If hosts go
169
  // unhealthy, and connections are not immediately preconnected, it could be that
170
  // many connections are desired when the host becomes healthy again, but
171
  // overwhelming it with connections is not desirable.
172
82082
  for (int i = 0; i < 3; ++i) {
173
82082
    result = tryCreateNewConnection();
174
82082
    if (result != ConnectionResult::CreatedNewConnection) {
175
51171
      break;
176
51171
    }
177
82082
  }
178
51171
  ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection,
179
51171
         dumpState());
180
51171
  return result;
181
51171
}
182

            
183
ConnPoolImplBase::ConnectionResult
184
82181
ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
185
  // There are already enough Connecting connections for the number of queued streams.
186
82181
  if (!shouldCreateNewConnection(global_preconnect_ratio)) {
187
51093
    return ConnectionResult::ShouldNotConnect;
188
51093
  }
189
31088
  ENVOY_LOG(trace, "creating new preconnect connection");
190

            
191
  // Drop new connection attempts if the load shed point indicates overload.
192
31088
  if (create_new_connection_load_shed_) {
193
8
    if (create_new_connection_load_shed_->shouldShedLoad()) {
194
4
      return ConnectionResult::LoadShed;
195
4
    }
196
8
  }
197

            
198
31084
  const bool can_create_connection = host_->canCreateConnection(priority_);
199

            
200
31084
  if (!can_create_connection) {
201
77
    host_->cluster().trafficStats()->upstream_cx_overflow_.inc();
202
77
  }
203
  // If we are at the connection circuit-breaker limit due to other upstreams having
204
  // too many open connections, and this upstream has no connections, always create one, to
205
  // prevent pending streams being queued to this upstream with no way to be processed.
206
31084
  if (can_create_connection || (ready_clients_.empty() && busy_clients_.empty() &&
207
31054
                                connecting_clients_.empty() && early_data_clients_.empty())) {
208
31054
    ENVOY_LOG(debug, "creating a new connection (connecting={})", connecting_clients_.size());
209
31054
    ActiveClientPtr client = instantiateActiveClient();
210
31054
    if (client.get() == nullptr) {
211
3
      ENVOY_LOG(trace, "connection creation failed");
212
3
      return ConnectionResult::FailedToCreateConnection;
213
3
    }
214
31051
    ASSERT(client->state() == ActiveClient::State::Connecting, dumpState());
215
31051
    ENVOY_BUG(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
216
31051
                  static_cast<uint64_t>(client->currentUnusedCapacity()),
217
31051
              dumpState());
218
31051
    ASSERT(client->real_host_description_);
219
    // Increase the connecting capacity to reflect the streams this connection can serve.
220
31051
    incrConnectingAndConnectedStreamCapacity(client->currentUnusedCapacity(), *client);
221
31051
    LinkedList::moveIntoList(std::move(client), owningList(client->state()));
222
31051
    assertCapacityCountsAreCorrect();
223
31051
    return can_create_connection ? ConnectionResult::CreatedNewConnection
224
31051
                                 : ConnectionResult::CreatedButRateLimited;
225
31054
  } else {
226
30
    ENVOY_LOG(trace, "not creating a new connection: connection constrained");
227
30
    return ConnectionResult::NoConnectionRateLimited;
228
30
  }
229
31084
}
230

            
231
void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
232
48812
                                            AttachContext& context) {
233
48812
  ASSERT(client.readyForStream(), dumpState());
234

            
235
48812
  Upstream::ClusterTrafficStats& traffic_stats = *host_->cluster().trafficStats();
236
48812
  if (client.state() == Envoy::ConnectionPool::ActiveClient::State::ReadyForEarlyData) {
237
23
    traffic_stats.upstream_rq_0rtt_.inc();
238
23
  }
239

            
240
48812
  if (enforceMaxRequests() && !host_->cluster().resourceManager(priority_).requests().canCreate()) {
241
25
    ENVOY_LOG(debug, "max streams overflow");
242
25
    onPoolFailure(client.real_host_description_, absl::string_view(),
243
25
                  ConnectionPool::PoolFailureReason::Overflow, context);
244
25
    traffic_stats.upstream_rq_pending_overflow_.inc();
245
25
    return;
246
25
  }
247
48787
  ENVOY_CONN_LOG(debug, "creating stream", client);
248

            
249
  // Latch capacity before updating remaining streams.
250
48787
  uint64_t capacity = client.currentUnusedCapacity();
251
48787
  client.remaining_streams_--;
252
48787
  if (client.remaining_streams_ == 0) {
253
28
    ENVOY_CONN_LOG(debug, "maximum streams per connection, start draining", client);
254
28
    traffic_stats.upstream_cx_max_requests_.inc();
255
28
    transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Draining);
256
48759
  } else if (capacity == 1) {
257
    // As soon as the new stream is created, the client will be maxed out.
258
27642
    transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Busy);
259
27642
  }
260

            
261
  // Decrement the capacity, as there's one less stream available for serving.
262
  // For HTTP/3, the capacity is updated in newStreamEncoder.
263
48787
  if (trackStreamCapacity()) {
264
47337
    decrConnectingAndConnectedStreamCapacity(1, client);
265
47337
  }
266
  // Track the new active stream.
267
48787
  cluster_connectivity_state_.incrActiveStreams(1);
268
48787
  num_active_streams_++;
269
48787
  host_->stats().rq_total_.inc();
270
48787
  host_->stats().rq_active_.inc();
271
48787
  traffic_stats.upstream_rq_total_.inc();
272
48787
  traffic_stats.upstream_rq_active_.inc();
273
48787
  host_->cluster().resourceManager(priority_).requests().inc();
274

            
275
48787
  onPoolReady(client, context);
276
48787
}
277

            
278
void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& client,
279
48787
                                      bool delay_attaching_stream) {
280
48787
  ENVOY_CONN_LOG(
281
48787
      debug, "destroying stream: {} active remaining, readyForStream {}, currentUnusedCapacity {}",
282
48787
      client, client.numActiveStreams(), client.readyForStream(), client.currentUnusedCapacity());
283
48787
  ASSERT(num_active_streams_ > 0, dumpState());
284
48787
  cluster_connectivity_state_.decrActiveStreams(1);
285
48787
  num_active_streams_--;
286
48787
  host_->stats().rq_active_.dec();
287
48787
  host_->cluster().trafficStats()->upstream_rq_active_.dec();
288
48787
  host_->cluster().resourceManager(priority_).requests().dec();
289
  // We don't update the capacity for HTTP/3 as the stream count should only
290
  // increase when a MAX_STREAMS frame is received.
291
48787
  if (trackStreamCapacity()) {
292
    // If the effective client capacity was limited by concurrency, increase connected capacity.
293
47337
    bool limited_by_concurrency =
294
47337
        client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1;
295
    // The capacity calculated by concurrency could be negative if a SETTINGS frame lowered the
296
    // number of allowed streams. In this case, connecting_and_connected_stream_capacity_ can be
297
    // negative, and effective client capacity was still limited by concurrency. Compare
298
    // client.concurrent_stream_limit_ and client.numActiveStreams() directly to avoid overflow.
299
47337
    bool negative_capacity = client.concurrent_stream_limit_ < client.numActiveStreams() + 1;
300
47337
    if (negative_capacity || limited_by_concurrency) {
301
44294
      incrConnectingAndConnectedStreamCapacity(1, client);
302
44294
    }
303
47337
  }
304
48787
  if (client.state() == ActiveClient::State::Draining && client.numActiveStreams() == 0) {
305
    // Close out the draining client if we no longer have active streams.
306
166
    client.close();
307
48621
  } else if (client.state() == ActiveClient::State::Busy && client.currentUnusedCapacity() > 0) {
308
24629
    if (!client.hasHandshakeCompleted()) {
309
1
      transitionActiveClientState(client, ActiveClient::State::ReadyForEarlyData);
310
1
      if (!delay_attaching_stream) {
311
1
        onUpstreamReadyForEarlyData(client);
312
1
      }
313
24628
    } else {
314
24628
      transitionActiveClientState(client, ActiveClient::State::Ready);
315
24628
      if (!delay_attaching_stream) {
316
61
        onUpstreamReady();
317
61
      }
318
24628
    }
319
24629
  }
320
48787
}
321

            
322
ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
323
49646
                                                             bool can_send_early_data) {
324
49646
  ASSERT(!is_draining_for_deletion_, dumpState());
325
49646
  ASSERT(!deferred_deleting_, dumpState());
326
49646
  assertCapacityCountsAreCorrect();
327

            
328
49646
  if (!ready_clients_.empty()) {
329
17796
    ActiveClient& client = *ready_clients_.front();
330
17796
    ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
331
17796
    attachStreamToClient(client, context);
332
    // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
333
17796
    tryCreateNewConnections();
334
17796
    return nullptr;
335
17796
  }
336

            
337
31850
  if (can_send_early_data && !early_data_clients_.empty()) {
338
4
    ActiveClient& client = *early_data_clients_.front();
339
4
    ENVOY_CONN_LOG(debug, "using existing early data ready connection", client);
340
4
    attachStreamToClient(client, context);
341
    // Even if there's an available client, we may want to preconnect to handle the next
342
    // incoming stream.
343
4
    tryCreateNewConnections();
344
4
    return nullptr;
345
4
  }
346

            
347
31846
  if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
348
9
    ENVOY_LOG(debug, "max pending streams overflow");
349
9
    onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
350
9
                  context);
351
9
    host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
352
9
    return nullptr;
353
9
  }
354

            
355
31837
  ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
356
31837
  ENVOY_LOG(debug, "trying to create new connection");
357
31837
  ENVOY_LOG(trace, fmt::format("{}", *this));
358

            
359
31837
  auto old_capacity = connecting_stream_capacity_;
360
  // This must come after newPendingStream() because this function uses the
361
  // length of pending_streams_ to determine if a new connection is needed.
362
31837
  const ConnectionResult result = tryCreateNewConnections();
363
  // If there is not enough connecting capacity, the only reason to not
364
  // increase capacity is if the connection limits are exceeded or load shed is
365
  // triggered.
366
31837
  ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
367
31837
                connecting_stream_capacity_ > old_capacity ||
368
31837
                (result == ConnectionResult::NoConnectionRateLimited ||
369
31837
                 result == ConnectionResult::FailedToCreateConnection ||
370
31837
                 result == ConnectionResult::LoadShed),
371
31837
            fmt::format("Failed to create expected connection: {}", *this));
372
31837
  if (result == ConnectionResult::FailedToCreateConnection) {
373
    // This currently only happens for HTTP/3 if secrets aren't yet loaded.
374
    // Trigger connection failure.
375
3
    pending->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
376
3
    onPoolFailure(nullptr, absl::string_view(),
377
3
                  ConnectionPool::PoolFailureReason::LocalConnectionFailure, context);
378
3
    return nullptr;
379
31834
  } else if (result == ConnectionResult::LoadShed) {
380
4
    pending->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
381
4
    onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
382
4
                  context);
383
4
    return nullptr;
384
4
  }
385
31830
  return pending;
386
31837
}
387

            
388
99
bool ConnPoolImplBase::maybePreconnectImpl(float global_preconnect_ratio) {
389
99
  ASSERT(!deferred_deleting_, dumpState());
390
99
  return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
391
99
}
392

            
393
24366
void ConnPoolImplBase::scheduleOnUpstreamReady() {
394
24366
  upstream_ready_cb_->scheduleCallbackCurrentIteration();
395
24366
}
396

            
397
54332
void ConnPoolImplBase::onUpstreamReady() {
398
85325
  while (!pending_streams_.empty() && !ready_clients_.empty()) {
399
30993
    ActiveClientPtr& client = ready_clients_.front();
400
30993
    ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
401
    // Pending streams are pushed onto the front, so pull from the back.
402
30993
    attachStreamToClient(*client, pending_streams_.back()->context());
403
30993
    cluster_connectivity_state_.decrPendingStreams(1);
404
30993
    pending_streams_.pop_back();
405
30993
  }
406
54332
  if (!pending_streams_.empty()) {
407
1181
    tryCreateNewConnections();
408
1181
  }
409
54332
}
410

            
411
228891
std::list<ActiveClientPtr>& ConnPoolImplBase::owningList(ActiveClient::State state) {
412
228891
  switch (state) {
413
62221
  case ActiveClient::State::Connecting:
414
62221
    return connecting_clients_;
415
109
  case ActiveClient::State::ReadyForEarlyData:
416
109
    return early_data_clients_;
417
110827
  case ActiveClient::State::Ready:
418
110827
    return ready_clients_;
419
55320
  case ActiveClient::State::Busy:
420
55734
  case ActiveClient::State::Draining:
421
55734
    return busy_clients_;
422
  case ActiveClient::State::Closed:
423
    break; // Fall through to PANIC.
424
228891
  }
425
  PANIC("unexpected");
426
}
427

            
428
void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,
429
83331
                                                   ActiveClient::State new_state) {
430
83331
  auto& old_list = owningList(client.state());
431
83331
  auto& new_list = owningList(new_state);
432
83331
  client.setState(new_state);
433

            
434
  // old_list and new_list can be equal when transitioning from Busy to Draining.
435
  //
436
  // The documentation for list.splice() (which is what moveBetweenLists() calls) is
437
  // unclear whether it is allowed for src and dst to be the same, so check here
438
  // since it is a no-op anyways.
439
83331
  if (&old_list != &new_list) {
440
83319
    client.moveBetweenLists(old_list, new_list);
441
83319
  }
442
83331
}
443

            
444
12497
void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }
445

            
446
240
void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
447
240
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
448

            
449
  // Create a separate list of elements to close to avoid mutate-while-iterating problems.
450
240
  std::list<ActiveClient*> to_close;
451

            
452
16223
  for (auto& client : ready_clients_) {
453
16180
    if (client->numActiveStreams() == 0) {
454
16172
      to_close.push_back(client.get());
455
16172
    }
456
16180
  }
457

            
458
240
  if (pending_streams_.empty()) {
459
226
    for (auto& client : connecting_clients_) {
460
15
      to_close.push_back(client.get());
461
15
    }
462
226
    for (auto& client : early_data_clients_) {
463
2
      if (client->numActiveStreams() == 0) {
464
1
        to_close.push_back(client.get());
465
1
      }
466
2
    }
467
226
  }
468

            
469
16226
  for (auto& entry : to_close) {
470
16188
    ENVOY_LOG_EVENT(debug, "closing_idle_client", "closing idle client {} for cluster {}",
471
16188
                    entry->id(), host_->cluster().name());
472
16188
    entry->close();
473
16188
  }
474
240
}
475

            
476
194
void ConnPoolImplBase::drainClients(std::list<ActiveClientPtr>& clients) {
477
198
  while (!clients.empty()) {
478
4
    ASSERT(clients.front()->numActiveStreams() > 0u, dumpState());
479
4
    ENVOY_LOG_EVENT(
480
4
        debug, "draining_non_idle_client", "draining {} client {} for cluster {}",
481
4
        (clients.front()->state() == ActiveClient::State::Ready ? "ready" : "early data"),
482
4
        clients.front()->id(), host_->cluster().name());
483
4
    transitionActiveClientState(*clients.front(), ActiveClient::State::Draining);
484
4
  }
485
194
}
486

            
487
217
void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) {
488
217
  if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
489
118
    is_draining_for_deletion_ = true;
490
118
    checkForIdleAndCloseIdleConnsIfDraining();
491
118
    return;
492
118
  }
493
99
  closeIdleConnectionsForDrainingPool();
494
  // closeIdleConnectionsForDrainingPool() closes all connections in ready_clients_ with no active
495
  // streams and if no pending streams, all connections in early_data_clients_ with no active
496
  // streams as well, so all remaining entries in ready_clients_ are serving streams. Move them and
497
  // all entries in busy_clients_ to draining.
498
99
  if (pending_streams_.empty()) {
499
    // The remaining early data clients are non-idle.
500
95
    drainClients(early_data_clients_);
501
95
  }
502

            
503
99
  drainClients(ready_clients_);
504

            
505
  // Changing busy_clients_ to Draining does not move them between lists,
506
  // so use a for-loop since the list is not mutated.
507
99
  ASSERT(&owningList(ActiveClient::State::Draining) == &busy_clients_, dumpState());
508
99
  for (auto& busy_client : busy_clients_) {
509
19
    if (busy_client->state() == ActiveClient::State::Draining) {
510
9
      continue;
511
9
    }
512
10
    ENVOY_LOG_EVENT(debug, "draining_busy_client", "draining busy client {} for cluster {}",
513
10
                    busy_client->id(), host_->cluster().name());
514
10
    transitionActiveClientState(*busy_client, ActiveClient::State::Draining);
515
10
  }
516
99
}
517

            
518
133428
bool ConnPoolImplBase::isIdleImpl() const {
519
133428
  return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
520
133428
         connecting_clients_.empty() && early_data_clients_.empty();
521
133428
}
522

            
523
/*
524
  This method may be invoked once or twice.
525
  It is called first time in ConnPoolImplBase::onConnectionEvent for Local/RemoteClose events.
526
  The second time it is called from Envoy::Tcp::ActiveTcpClient::~ActiveTcpClient via
527
  ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining.
528

            
529
  The logic must be constructed in such way that the method is called once or twice.
530
  See PR 30807 description for explanation why idle callbacks are deleted after being called.
531
*/
532
108413
void ConnPoolImplBase::checkForIdleAndNotify() {
533
108413
  if (isIdleImpl()) {
534
13301
    ENVOY_LOG(debug, "invoking {} idle callback(s) - is_draining_for_deletion_={}",
535
13301
              idle_callbacks_.size(), is_draining_for_deletion_);
536
13301
    for (const Instance::IdleCb& cb : idle_callbacks_) {
537
12477
      cb();
538
12477
    }
539
    // Clear callbacks, so they are not executed if checkForIdleAndNotify is called again.
540
13301
    idle_callbacks_.clear();
541
13301
  }
542
108413
}
543

            
544
77362
void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() {
545
77362
  if (is_draining_for_deletion_) {
546
141
    closeIdleConnectionsForDrainingPool();
547
141
  }
548

            
549
77362
  checkForIdleAndNotify();
550
77362
}
551

            
552
void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
553
61861
                                         Network::ConnectionEvent event) {
554
61861
  switch (event) {
555
10964
  case Network::ConnectionEvent::RemoteClose:
556
31051
  case Network::ConnectionEvent::LocalClose: {
557
31051
    if (client.connect_timer_) {
558
354
      ASSERT(!client.has_handshake_completed_);
559
354
      client.connect_timer_->disableTimer();
560
354
      client.connect_timer_.reset();
561
354
    }
562
31051
    decrConnectingAndConnectedStreamCapacity(client.currentUnusedCapacity(), client);
563

            
564
    // Make sure that onStreamClosed won't double count.
565
31051
    client.remaining_streams_ = 0;
566
    // The client died.
567
31051
    ENVOY_CONN_LOG(debug, "client disconnected, failure reason: {}", client, failure_reason);
568

            
569
31051
    Envoy::Upstream::reportUpstreamCxDestroy(host_, event);
570
31051
    const bool incomplete_stream = client.closingWithIncompleteStream();
571
31051
    if (incomplete_stream) {
572
2704
      Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
573
2704
    }
574

            
575
31051
    if (!client.hasHandshakeCompleted()) {
576
333
      client.has_handshake_completed_ = true;
577
333
      host_->cluster().trafficStats()->upstream_cx_connect_fail_.inc();
578
333
      host_->stats().cx_connect_fail_.inc();
579

            
580
333
      onConnectFailed(client);
581
      // Purge pending streams only if this client doesn't contribute to the local connecting
582
      // stream capacity. In other words, the rest clients  would be able to handle all the
583
      // pending stream once they are connected.
584
333
      ConnectionPool::PoolFailureReason reason;
585
333
      if (client.timed_out_) {
586
8
        reason = ConnectionPool::PoolFailureReason::Timeout;
587
327
      } else if (event == Network::ConnectionEvent::RemoteClose) {
588
237
        reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure;
589
293
      } else {
590
88
        reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure;
591
88
      }
592

            
593
      // Raw connect failures should never happen under normal circumstances. If we have an
594
      // upstream that is behaving badly, streams can get stuck here in the pending state. If we
595
      // see a connect failure, we purge all pending streams so that calling code can determine
596
      // what to do with the stream.
597
      // NOTE: We move the existing pending streams to a temporary list. This is done so that
598
      //       if retry logic submits a new stream to the pool, we don't fail it inline.
599
333
      purgePendingStreams(client.real_host_description_, failure_reason, reason);
600
      // See if we should preconnect based on active connections.
601
333
      if (!is_draining_for_deletion_) {
602
324
        tryCreateNewConnections();
603
324
      }
604
333
    }
605

            
606
    // We need to release our resourceManager() resources before checking below for
607
    // whether we can create a new connection. Normally this would happen when
608
    // client's destructor runs, but this object needs to be deferredDelete'd(), so
609
    // this forces part of its cleanup to happen now.
610
31051
    client.releaseResources();
611

            
612
    // Again, since we know this object is going to be deferredDelete'd(), we take
613
    // this opportunity to disable and reset the connection duration timer so that
614
    // it doesn't trigger while on the deferred delete list. In theory it is safe
615
    // to handle the Closed state in onConnectionDurationTimeout, but we handle
616
    // it here for simplicity and safety anyway.
617
31051
    if (client.connection_duration_timer_) {
618
18
      client.connection_duration_timer_->disableTimer();
619
18
      client.connection_duration_timer_.reset();
620
18
    }
621

            
622
31051
    dispatcher_.deferredDelete(client.removeFromList(owningList(client.state())));
623

            
624
    // Check if the pool transitioned to idle state after removing closed client
625
    // from one of the client tracking lists.
626
    // There is no need to check if other connections are idle in a draining pool
627
    // because the pool will close all idle connection when it is starting to
628
    // drain.
629
    // Trying to close other connections here can lead to deep recursion when
630
    // a large number idle connections are closed at the start of pool drain.
631
    // See CdsIntegrationTest.CdsClusterDownWithLotsOfIdleConnections for an example.
632
31051
    checkForIdleAndNotify();
633

            
634
31051
    client.setState(ActiveClient::State::Closed);
635

            
636
    // If we have pending streams and we just lost a connection we should make a new one.
637
31051
    if (!pending_streams_.empty()) {
638
29
      tryCreateNewConnections();
639
29
    }
640
31051
    break;
641
10964
  }
642
30756
  case Network::ConnectionEvent::Connected: {
643
30756
    ASSERT(client.connect_timer_ != nullptr && !client.has_handshake_completed_);
644
30756
    client.connect_timer_->disableTimer();
645
30756
    client.connect_timer_.reset();
646

            
647
30756
    ENVOY_BUG(connecting_stream_capacity_ >= client.currentUnusedCapacity(), dumpState());
648
30756
    connecting_stream_capacity_ -= client.currentUnusedCapacity();
649
30756
    client.has_handshake_completed_ = true;
650
30756
    client.conn_connect_ms_->complete();
651
30756
    client.conn_connect_ms_.reset();
652
30756
    if (client.state() == ActiveClient::State::Connecting ||
653
30756
        client.state() == ActiveClient::State::ReadyForEarlyData) {
654
30747
      transitionActiveClientState(client,
655
30747
                                  (client.currentUnusedCapacity() > 0 ? ActiveClient::State::Ready
656
30747
                                                                      : ActiveClient::State::Busy));
657
30747
    }
658

            
659
    // Now that the active client is ready, set up a timer for max connection duration.
660
30756
    const absl::optional<std::chrono::milliseconds> max_connection_duration =
661
30756
        client.parent_.host()->cluster().maxConnectionDuration();
662
30756
    if (max_connection_duration.has_value()) {
663
18
      client.connection_duration_timer_ = client.parent_.dispatcher().createTimer(
664
18
          [&client]() { client.onConnectionDurationTimeout(); });
665
18
      client.connection_duration_timer_->enableTimer(max_connection_duration.value());
666
18
    }
667
    // Initialize client read filters
668
30756
    client.initializeReadFilters();
669

            
670
    // At this point, for the mixed ALPN pool, the client may be deleted. Do not
671
    // refer to client after this point.
672
30756
    onConnected(client);
673
30756
    if (client.readyForStream()) {
674
30744
      onUpstreamReady();
675
30744
    }
676
30756
    checkForIdleAndCloseIdleConnsIfDraining();
677
30756
    break;
678
10964
  }
679
54
  case Network::ConnectionEvent::ConnectedZeroRtt: {
680
54
    ENVOY_CONN_LOG(debug, "0-RTT connected with capacity {}", client,
681
54
                   client.currentUnusedCapacity());
682
    // No need to update connecting capacity and connect_timer_ as the client is still connecting.
683
54
    ASSERT(client.state() == ActiveClient::State::Connecting);
684
54
    host()->cluster().trafficStats()->upstream_cx_connect_with_0_rtt_.inc();
685
54
    transitionActiveClientState(client, (client.currentUnusedCapacity() > 0
686
54
                                             ? ActiveClient::State::ReadyForEarlyData
687
54
                                             : ActiveClient::State::Busy));
688
54
    break;
689
10964
  }
690
61861
  }
691
61861
  assertCapacityCountsAreCorrect();
692
61861
}
693

            
694
PendingStream::PendingStream(ConnPoolImplBase& parent, bool can_send_early_data)
695
31837
    : parent_(parent), can_send_early_data_(can_send_early_data) {
696
31837
  Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
697
31837
  traffic_stats.upstream_rq_pending_total_.inc();
698
31837
  traffic_stats.upstream_rq_pending_active_.inc();
699
31837
  parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().inc();
700
31837
}
701

            
702
31837
PendingStream::~PendingStream() {
703
31837
  parent_.host()->cluster().trafficStats()->upstream_rq_pending_active_.dec();
704
31837
  parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().dec();
705
31837
}
706

            
707
540
void PendingStream::cancel(Envoy::ConnectionPool::CancelPolicy policy) {
708
540
  parent_.onPendingStreamCancel(*this, policy);
709
540
}
710

            
711
void ConnPoolImplBase::purgePendingStreams(
712
    const Upstream::HostDescriptionConstSharedPtr& host_description,
713
333
    absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason) {
714
  // NOTE: We move the existing pending streams to a temporary list. This is done so that
715
  //       if retry logic submits a new stream to the pool, we don't fail it inline.
716
333
  cluster_connectivity_state_.decrPendingStreams(pending_streams_.size());
717
333
  pending_streams_to_purge_ = std::move(pending_streams_);
718
618
  while (!pending_streams_to_purge_.empty()) {
719
285
    PendingStreamPtr stream =
720
285
        pending_streams_to_purge_.front()->removeFromList(pending_streams_to_purge_);
721
285
    host_->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
722
285
    onPoolFailure(host_description, failure_reason, reason, stream->context());
723
285
  }
724
333
}
725

            
726
30
bool ConnPoolImplBase::connectingConnectionIsExcess(const ActiveClient& client) const {
727
30
  ASSERT(!client.hasHandshakeCompleted());
728
30
  ENVOY_BUG(connecting_stream_capacity_ >= client.currentUnusedCapacity(), dumpState());
729
  // If perUpstreamPreconnectRatio is one, this simplifies to checking if there would still be
730
  // sufficient connecting stream capacity to serve all pending streams if the most recent client
731
  // were removed from the picture.
732
  //
733
  // If preconnect ratio is set, it also factors in the anticipated load based on both queued
734
  // streams and active streams, and makes sure the connecting capacity would still be sufficient to
735
  // serve that even with the most recent client removed.
736
30
  return (pending_streams_.size() + num_active_streams_) * perUpstreamPreconnectRatio() <=
737
30
         (connecting_stream_capacity_ - client.currentUnusedCapacity() + num_active_streams_);
738
30
}
739

            
740
void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
741
540
                                             Envoy::ConnectionPool::CancelPolicy policy) {
742
540
  ENVOY_LOG(debug, "cancelling pending stream");
743
540
  if (!pending_streams_to_purge_.empty()) {
744
    // If pending_streams_to_purge_ is not empty, it means that we are called from
745
    // with-in a onPoolFailure callback invoked in purgePendingStreams (i.e. purgePendingStreams
746
    // is down in the call stack). Remove this stream from the list as it is cancelled,
747
    // and there is no need to call its onPoolFailure callback.
748
1
    stream.removeFromList(pending_streams_to_purge_);
749
539
  } else {
750
539
    cluster_connectivity_state_.decrPendingStreams(1);
751
539
    stream.removeFromList(pending_streams_);
752
539
  }
753
540
  if (policy == Envoy::ConnectionPool::CancelPolicy::CloseExcess) {
754
37
    if (!connecting_clients_.empty() &&
755
37
        connectingConnectionIsExcess(*connecting_clients_.front())) {
756
22
      auto& client = *connecting_clients_.front();
757
22
      transitionActiveClientState(client, ActiveClient::State::Draining);
758
22
      client.close();
759
28
    } else if (!early_data_clients_.empty()) {
760
1
      for (ActiveClientPtr& client : early_data_clients_) {
761
1
        if (client->numActiveStreams() == 0) {
762
          // Find an idle early data client and check if it is excess.
763
1
          if (connectingConnectionIsExcess(*client)) {
764
            // Close the client after the for loop avoid messing up with iterator.
765
1
            transitionActiveClientState(*client, ActiveClient::State::Draining);
766
1
            client->close();
767
1
          }
768
1
          break;
769
1
        }
770
1
      }
771
1
    }
772
37
  }
773

            
774
540
  host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
775
540
  checkForIdleAndCloseIdleConnsIfDraining();
776
540
}
777

            
778
void ConnPoolImplBase::decrConnectingAndConnectedStreamCapacity(uint32_t delta,
779
80020
                                                                ActiveClient& client) {
780
80020
  decrClusterStreamCapacity(delta);
781

            
782
80020
  if (!client.hasHandshakeCompleted()) {
783
    // If still doing handshake, it is contributing to the local connecting stream capacity. Update
784
    // the capacity as well.
785
405
    ENVOY_BUG(connecting_stream_capacity_ >= delta, dumpState());
786
405
    connecting_stream_capacity_ -= delta;
787
405
  }
788
80020
}
789

            
790
void ConnPoolImplBase::incrConnectingAndConnectedStreamCapacity(uint32_t delta,
791
75383
                                                                ActiveClient& client) {
792
75383
  incrClusterStreamCapacity(delta);
793

            
794
75383
  if (!client.hasHandshakeCompleted()) {
795
31053
    connecting_stream_capacity_ += delta;
796
31053
  }
797
75383
}
798

            
799
54
void ConnPoolImplBase::onUpstreamReadyForEarlyData(ActiveClient& client) {
800
54
  ASSERT(!client.hasHandshakeCompleted() && client.readyForStream());
801
  // Check pending streams backward for safe request.
802
  // Note that this is a O(n) search, but the expected size of pending_streams_ should be small. If
803
  // this becomes a problem, we could split pending_streams_ into 2 lists.
804
54
  auto it = pending_streams_.end();
805
54
  if (it == pending_streams_.begin()) {
806
2
    return;
807
2
  }
808
52
  --it;
809
120
  while (client.currentUnusedCapacity() > 0) {
810
120
    PendingStream& stream = **it;
811
120
    bool stop_iteration{false};
812
120
    if (it != pending_streams_.begin()) {
813
68
      --it;
814
111
    } else {
815
52
      stop_iteration = true;
816
52
    }
817

            
818
120
    if (stream.can_send_early_data_) {
819
19
      ENVOY_CONN_LOG(debug, "creating stream for early data.", client);
820
19
      attachStreamToClient(client, stream.context());
821
19
      cluster_connectivity_state_.decrPendingStreams(1);
822
19
      stream.removeFromList(pending_streams_);
823
19
    }
824
120
    if (stop_iteration) {
825
52
      return;
826
52
    }
827
120
  }
828
52
}
829

            
830
namespace {
831
// Translate zero to UINT32_MAX so that the zero/unlimited case doesn't
832
// have to be handled specially.
833
93333
uint32_t translateZeroToUnlimited(uint32_t limit) {
834
93333
  return (limit != 0) ? limit : std::numeric_limits<uint32_t>::max();
835
93333
}
836
} // namespace
837

            
838
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
839
                           uint32_t concurrent_stream_limit)
840
2196
    : ActiveClient(parent, lifetime_stream_limit, concurrent_stream_limit,
841
2196
                   concurrent_stream_limit) {}
842

            
843
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
844
                           uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit)
845
31111
    : parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
846
31111
      configured_stream_limit_(translateZeroToUnlimited(effective_concurrent_streams)),
847
31111
      concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
848
31111
      connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
849
31111
  conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
850
31111
      parent_.host()->cluster().trafficStats()->upstream_cx_connect_ms_,
851
31111
      parent_.dispatcher().timeSource());
852
31111
  conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
853
31111
      parent_.host()->cluster().trafficStats()->upstream_cx_length_ms_,
854
31111
      parent_.dispatcher().timeSource());
855
31111
  connect_timer_->enableTimer(parent_.host()->cluster().connectTimeout());
856
31111
  parent_.host()->stats().cx_total_.inc();
857
31111
  parent_.host()->stats().cx_active_.inc();
858
31111
  parent_.host()->cluster().trafficStats()->upstream_cx_total_.inc();
859
31111
  parent_.host()->cluster().trafficStats()->upstream_cx_active_.inc();
860
31111
  parent_.host()->cluster().resourceManager(parent_.priority()).connections().inc();
861
31111
}
862

            
863
31111
ActiveClient::~ActiveClient() { releaseResourcesBase(); }
864

            
865
62162
void ActiveClient::releaseResourcesBase() {
866
62162
  if (!resources_released_) {
867
31111
    resources_released_ = true;
868

            
869
31111
    conn_length_->complete();
870

            
871
31111
    parent_.host()->cluster().trafficStats()->upstream_cx_active_.dec();
872
31111
    parent_.host()->stats().cx_active_.dec();
873
31111
    parent_.host()->cluster().resourceManager(parent_.priority()).connections().dec();
874
31111
  }
875
62162
}
876

            
877
8
void ActiveClient::onConnectTimeout() {
878
8
  ENVOY_CONN_LOG(debug, "connect timeout", *this);
879
8
  parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
880
8
  timed_out_ = true;
881
8
  close();
882
8
}
883

            
884
16
void ActiveClient::onConnectionDurationTimeout() {
885
16
  if (!hasHandshakeCompleted()) {
886
    // The connection duration timer should only have started after we were connected.
887
1
    ENVOY_BUG(false, "max connection duration reached while connecting");
888
1
    return;
889
1
  }
890

            
891
15
  if (state_ == ActiveClient::State::Closed) {
892
    // The connection duration timer should have been disabled and reset in onConnectionEvent
893
    // for closing connections.
894
1
    ENVOY_BUG(false, "max connection duration reached while closed");
895
1
    return;
896
1
  }
897

            
898
  // There's nothing to do if the client is draining.
899
14
  if (state_ == ActiveClient::State::Draining) {
900
1
    return;
901
1
  }
902

            
903
13
  ENVOY_CONN_LOG(debug, "max connection duration reached, start draining", *this);
904
13
  parent_.host()->cluster().trafficStats()->upstream_cx_max_duration_reached_.inc();
905
13
  parent_.transitionActiveClientState(*this, Envoy::ConnectionPool::ActiveClient::State::Draining);
906

            
907
  // Close out the draining client if we no longer have active streams.
908
  // We have to do this here because there won't be an onStreamClosed (because there are
909
  // no active streams) to do it for us later.
910
13
  if (numActiveStreams() == 0) {
911
12
    close();
912
12
  }
913
13
}
914

            
915
207
void ActiveClient::drain() {
916
207
  const int64_t unused = currentUnusedCapacity();
917

            
918
  // Remove draining client's capacity from the pool.
919
  //
920
  // The code that adds capacity back to the pool in `onStreamClosed` will only add it back if
921
  // it sees the connection as currently limited by concurrent capacity, not total lifetime streams.
922
  // Setting this to zero ensures that the `limited_by_concurrency` check does not detect this
923
  // connection as limited for that reason, because it is now being marked as having zero remaining
924
  // lifetime requests.
925
207
  remaining_streams_ = 0;
926

            
927
207
  if (unused > 0) {
928
167
    parent_.decrConnectingAndConnectedStreamCapacity(unused, *this);
929
167
  }
930
207
}
931

            
932
} // namespace ConnectionPool
933
} // namespace Envoy