Coverage Report

Created: 2026-03-11 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/ConnectionPool/ConnectionPool.swift
Line
Count
Source
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
import Atomics
18
import Logging
19
import NIOConcurrencyHelpers
20
import NIOCore
21
import NIOHTTP2
22
23
@usableFromInline
24
internal final class ConnectionPool {
25
  /// The event loop all connections in this pool are running on.
26
  @usableFromInline
27
  internal let eventLoop: EventLoop
28
29
  @usableFromInline
30
  internal enum State {
31
    case active
32
    case shuttingDown(EventLoopFuture<Void>)
33
    case shutdown
34
  }
35
36
  /// The state of the connection pool.
37
  @usableFromInline
38
0
  internal var _state: State = .active
39
40
  /// The most recent connection error we have observed.
41
  ///
42
  /// This error is used to provide additional context to failed waiters. A waiter may, for example,
43
  /// timeout because the pool is busy, or because no connection can be established because of an
44
  /// underlying connection error. In the latter case it's useful for the caller to know why the
45
  /// connection is failing at the RPC layer.
46
  ///
47
  /// This value is cleared when a connection becomes 'available'. That is, when we receive an
48
  /// http/2 SETTINGS frame.
49
  ///
50
  /// This value is set whenever an underlying connection transitions to the transient failure state
51
  /// or to the idle state and has an associated error.
52
  @usableFromInline
53
0
  internal var _mostRecentError: Error? = nil
54
55
  /// Connection managers and their stream availability state keyed by the ID of the connection
56
  /// manager.
57
  ///
58
  /// Connections are accessed by their ID for connection state changes (infrequent) and when
59
  /// streams are closed (frequent). However when choosing which connection to succeed a waiter
60
  /// with (frequent) requires the connections to be ordered by their availability. A dictionary
61
  /// might not be the most efficient data structure (a queue prioritised by stream availability may
62
  /// be a better choice given the number of connections is likely to be very low in practice).
63
  @usableFromInline
64
  internal var _connections: [ConnectionManagerID: PerConnectionState]
65
66
  /// The threshold which if exceeded when creating a stream determines whether the pool will
67
  /// start connecting an idle connection (if one exists).
68
  ///
69
  /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters
70
  /// and the number of reserved streams) and the total number of streams which non-idle connections
71
  /// could support (this includes the streams that a connection in the connecting state could
72
  /// support).
73
  @usableFromInline
74
  internal let reservationLoadThreshold: Double
75
76
  /// The assumed value for the maximum number of concurrent streams a connection can support. We
77
  /// assume a connection will support this many streams until we know better.
78
  @usableFromInline
79
  internal let assumedMaxConcurrentStreams: Int
80
81
  /// A queue of waiters which may or may not get a stream in the future.
82
  @usableFromInline
83
  internal var waiters: CircularBuffer<Waiter>
84
85
  /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If
86
  /// there are this many waiters in the queue then the next waiter will be failed immediately.
87
  @usableFromInline
88
  internal let maxWaiters: Int
89
90
  /// The number of connections in the pool that should always be kept open (i.e. they won't go idle).
91
  /// In other words, it's the number of connections for which we should ignore idle timers.
92
  @usableFromInline
93
  internal let minConnections: Int
94
95
  /// Configuration for backoff between subsequence connection attempts.
96
  @usableFromInline
97
  internal let connectionBackoff: ConnectionBackoff
98
99
  /// Provides a channel factory to the `ConnectionManager`.
100
  @usableFromInline
101
  internal let channelProvider: ConnectionManagerChannelProvider
102
103
  /// The object to notify about changes to stream reservations; in practice this is usually
104
  /// the `PoolManager`.
105
  @usableFromInline
106
  internal let streamLender: StreamLender
107
108
  @usableFromInline
109
  internal var delegate: GRPCConnectionPoolDelegate?
110
111
  /// A logger.
112
  @usableFromInline
113
  internal let logger: Logger
114
115
  /// Returns `NIODeadline` representing 'now'. This is useful for testing.
116
  @usableFromInline
117
  internal let now: () -> NIODeadline
118
119
  /// The ID of this sub-pool.
120
  @usableFromInline
121
  internal let id: GRPCSubPoolID
122
123
  /// Logging metadata keys.
124
  @usableFromInline
125
  internal enum Metadata {
126
    /// The ID of this pool.
127
    @usableFromInline
128
    static let id = "pool.id"
129
    /// The number of stream reservations (i.e. number of open streams + number of waiters).
130
    @usableFromInline
131
    static let reservationsCount = "pool.reservations.count"
132
    /// The number of streams this pool can support with non-idle connections at this time.
133
    @usableFromInline
134
    static let reservationsCapacity = "pool.reservations.capacity"
135
    /// The current reservation load (i.e. reservation count / reservation capacity)
136
    @usableFromInline
137
    static let reservationsLoad = "pool.reservations.load"
138
    /// The reservation load threshold, above which a new connection will be created (if possible).
139
    @usableFromInline
140
    static let reservationsLoadThreshold = "pool.reservations.loadThreshold"
141
    /// The current number of waiters in the pool.
142
    @usableFromInline
143
    static let waitersCount = "pool.waiters.count"
144
    /// The maximum number of waiters the pool is configured to allow.
145
    @usableFromInline
146
    static let waitersMax = "pool.waiters.max"
147
    /// The number of waiters which were successfully serviced.
148
    @usableFromInline
149
    static let waitersServiced = "pool.waiters.serviced"
150
    /// The ID of waiter.
151
    @usableFromInline
152
    static let waiterID = "pool.waiter.id"
153
    /// The maximum number of connections allowed in the pool.
154
    @usableFromInline
155
    static let connectionsMax = "pool.connections.max"
156
    /// The number of connections in the ready state.
157
    @usableFromInline
158
    static let connectionsReady = "pool.connections.ready"
159
    /// The number of connections in the connecting state.
160
    @usableFromInline
161
    static let connectionsConnecting = "pool.connections.connecting"
162
    /// The number of connections in the transient failure state.
163
    @usableFromInline
164
    static let connectionsTransientFailure = "pool.connections.transientFailure"
165
  }
166
167
  @usableFromInline
168
  init(
169
    eventLoop: EventLoop,
170
    maxWaiters: Int,
171
    minConnections: Int,
172
    reservationLoadThreshold: Double,
173
    assumedMaxConcurrentStreams: Int,
174
    connectionBackoff: ConnectionBackoff,
175
    channelProvider: ConnectionManagerChannelProvider,
176
    streamLender: StreamLender,
177
    delegate: GRPCConnectionPoolDelegate?,
178
    logger: Logger,
179
0
    now: @escaping () -> NIODeadline = NIODeadline.now
180
0
  ) {
181
0
    precondition(
182
0
      (0.0 ... 1.0).contains(reservationLoadThreshold),
183
0
      "reservationLoadThreshold must be within the range 0.0 ... 1.0"
184
0
    )
185
0
186
0
    self.reservationLoadThreshold = reservationLoadThreshold
187
0
    self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
188
0
189
0
    self._connections = [:]
190
0
    self.maxWaiters = maxWaiters
191
0
    self.minConnections = minConnections
192
0
    self.waiters = CircularBuffer(initialCapacity: 16)
193
0
194
0
    self.eventLoop = eventLoop
195
0
    self.connectionBackoff = connectionBackoff
196
0
    self.channelProvider = channelProvider
197
0
    self.streamLender = streamLender
198
0
    self.delegate = delegate
199
0
    self.now = now
200
0
201
0
    let id = GRPCSubPoolID.next()
202
0
    var logger = logger
203
0
    logger[metadataKey: Metadata.id] = "\(id)"
204
0
205
0
    self.id = id
206
0
    self.logger = logger
207
0
  }
208
209
  /// Initialize the connection pool.
210
  ///
211
  /// - Parameter connections: The number of connections to add to the pool.
212
0
  internal func initialize(connections: Int) {
213
0
    assert(self._connections.isEmpty)
214
0
    self.logger.debug(
215
0
      "initializing new sub-pool",
216
0
      metadata: [
217
0
        Metadata.waitersMax: .stringConvertible(self.maxWaiters),
218
0
        Metadata.connectionsMax: .stringConvertible(connections),
219
0
      ]
220
0
    )
221
0
    self._connections.reserveCapacity(connections)
222
0
    var numberOfKeepOpenConnections = self.minConnections
223
0
    while self._connections.count < connections {
224
0
      // If we have less than the minimum number of connections, don't let
225
0
      // the new connection close when idle.
226
0
      let idleBehavior =
227
0
        numberOfKeepOpenConnections > 0
228
0
        ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout
229
0
      numberOfKeepOpenConnections -= 1
230
0
      self.addConnectionToPool(idleBehavior: idleBehavior)
231
0
    }
232
0
  }
233
234
  /// Make and add a new connection to the pool.
235
0
  private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) {
236
0
    let manager = ConnectionManager(
237
0
      eventLoop: self.eventLoop,
238
0
      channelProvider: self.channelProvider,
239
0
      callStartBehavior: .waitsForConnectivity,
240
0
      idleBehavior: idleBehavior,
241
0
      connectionBackoff: self.connectionBackoff,
242
0
      connectivityDelegate: self,
243
0
      http2Delegate: self,
244
0
      logger: self.logger
245
0
    )
246
0
    let id = manager.id
247
0
    self._connections[id] = PerConnectionState(manager: manager)
248
0
    self.delegate?.connectionAdded(id: .init(id))
249
0
250
0
    // If it's one of the connections that should be kept open, then connect
251
0
    // straight away.
252
0
    switch idleBehavior {
253
0
    case .neverGoIdle:
254
0
      self.eventLoop.execute {
255
0
        if manager.sync.isIdle {
256
0
          manager.sync.startConnecting()
257
0
        }
258
0
      }
259
0
    case .closeWhenIdleTimeout:
260
0
      ()
261
0
    }
262
0
  }
263
264
  // MARK: - Called from the pool manager
265
266
  /// Make and initialize an HTTP/2 stream `Channel`.
267
  ///
268
  /// - Parameters:
269
  ///   - deadline: The point in time by which the `promise` must have been resolved.
270
  ///   - promise: A promise for a `Channel`.
271
  ///   - logger: A request logger.
272
  ///   - initializer: A closure to initialize the `Channel` with.
273
  @inlinable
274
  internal func makeStream(
275
    deadline: NIODeadline,
276
    promise: EventLoopPromise<Channel>,
277
    logger: Logger,
278
    initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
279
0
  ) {
280
0
    if self.eventLoop.inEventLoop {
281
0
      self._makeStream(
282
0
        deadline: deadline,
283
0
        promise: promise,
284
0
        logger: logger,
285
0
        initializer: initializer
286
0
      )
287
0
    } else {
288
0
      self.eventLoop.execute {
289
0
        self._makeStream(
290
0
          deadline: deadline,
291
0
          promise: promise,
292
0
          logger: logger,
293
0
          initializer: initializer
294
0
        )
295
0
      }
296
0
    }
297
0
  }
298
299
  /// See `makeStream(deadline:promise:logger:initializer:)`.
300
  @inlinable
301
  internal func makeStream(
302
    deadline: NIODeadline,
303
    logger: Logger,
304
    initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
305
0
  ) -> EventLoopFuture<Channel> {
306
0
    let promise = self.eventLoop.makePromise(of: Channel.self)
307
0
    self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer)
308
0
    return promise.futureResult
309
0
  }
310
311
  /// Shutdown the connection pool.
312
  ///
313
  /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
314
  /// calls to `makeStream` will be failed immediately.
315
  ///
316
  /// - Parameter mode: The mode to use when shutting down.
317
  /// - Returns: A future indicated when shutdown has been completed.
318
0
  internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> {
319
0
    let promise = self.eventLoop.makePromise(of: Void.self)
320
0
321
0
    if self.eventLoop.inEventLoop {
322
0
      self._shutdown(mode: mode, promise: promise)
323
0
    } else {
324
0
      self.eventLoop.execute {
325
0
        self._shutdown(mode: mode, promise: promise)
326
0
      }
327
0
    }
328
0
329
0
    return promise.futureResult
330
0
  }
331
332
  /// See `makeStream(deadline:promise:logger:initializer:)`.
333
  ///
334
  /// - Important: Must be called on the pool's `EventLoop`.
335
  @inlinable
336
  internal func _makeStream(
337
    deadline: NIODeadline,
338
    promise: EventLoopPromise<Channel>,
339
    logger: Logger,
340
    initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
341
0
  ) {
342
0
    self.eventLoop.assertInEventLoop()
343
0
344
0
    guard case .active = self._state else {
345
0
      // Fail the promise right away if we're shutting down or already shut down.
346
0
      promise.fail(GRPCConnectionPoolError.shutdown)
347
0
      return
348
0
    }
349
0
350
0
    // Try to make a stream on an existing connection.
351
0
    let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer)
352
0
353
0
    if !streamCreated {
354
0
      // No stream was created, wait for one.
355
0
      self._enqueueWaiter(
356
0
        deadline: deadline,
357
0
        promise: promise,
358
0
        logger: logger,
359
0
        initializer: initializer
360
0
      )
361
0
    }
362
0
  }
363
364
  /// Try to find an existing connection on which we can make a stream.
365
  ///
366
  /// - Parameters:
367
  ///   - promise: A promise to succeed if we can make a stream.
368
  ///   - initializer: A closure to initialize the stream with.
369
  /// - Returns: A boolean value indicating whether the stream was created or not.
370
  @inlinable
371
  internal func _tryMakeStream(
372
    promise: EventLoopPromise<Channel>,
373
    initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
374
0
  ) -> Bool {
375
0
    // We shouldn't jump the queue.
376
0
    guard self.waiters.isEmpty else {
377
0
      return false
378
0
    }
379
0
380
0
    // Reserve a stream, if we can.
381
0
    guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else {
382
0
      return false
383
0
    }
384
0
385
0
    multiplexer.createStreamChannel(promise: promise, initializer)
386
0
387
0
    // Has reserving another stream tipped us over the limit for needing another connection?
388
0
    if self._shouldBringUpAnotherConnection() {
389
0
      self._startConnectingIdleConnection()
390
0
    }
391
0
392
0
    return true
393
0
  }
394
395
  /// Enqueue a waiter to be provided with a stream at some point in the future.
396
  ///
397
  /// - Parameters:
398
  ///   - deadline: The point in time by which the promise should have been completed.
399
  ///   - promise: The promise to complete with the `Channel`.
400
  ///   - logger: A logger.
401
  ///   - initializer: A closure to initialize the `Channel` with.
402
  @inlinable
403
  internal func _enqueueWaiter(
404
    deadline: NIODeadline,
405
    promise: EventLoopPromise<Channel>,
406
    logger: Logger,
407
    initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
408
0
  ) {
409
0
    // Don't overwhelm the pool with too many waiters.
410
0
    guard self.waiters.count < self.maxWaiters else {
411
0
      logger.trace(
412
0
        "connection pool has too many waiters",
413
0
        metadata: [
414
0
          Metadata.waitersMax: .stringConvertible(self.maxWaiters)
415
0
        ]
416
0
      )
417
0
      promise.fail(GRPCConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError))
418
0
      return
419
0
    }
420
0
421
0
    let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer)
422
0
423
0
    // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the
424
0
    // timeout before appending it to the waiters, it wont run until the next event loop tick at the
425
0
    // earliest (even if the deadline has already passed).
426
0
    waiter.scheduleTimeout(on: self.eventLoop) {
427
0
      waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError))
428
0
429
0
      if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) {
430
0
        self.waiters.remove(at: index)
431
0
432
0
        logger.trace(
433
0
          "timed out waiting for a connection",
434
0
          metadata: [
435
0
            Metadata.waiterID: "\(waiter.id)",
436
0
            Metadata.waitersCount: .stringConvertible(self.waiters.count),
437
0
          ]
438
0
        )
439
0
      }
440
0
    }
441
0
442
0
    // request logger
443
0
    logger.debug(
444
0
      "waiting for a connection to become available",
445
0
      metadata: [
446
0
        Metadata.waiterID: "\(waiter.id)",
447
0
        Metadata.waitersCount: .stringConvertible(self.waiters.count),
448
0
      ]
449
0
    )
450
0
451
0
    self.waiters.append(waiter)
452
0
453
0
    // pool logger
454
0
    self.logger.trace(
455
0
      "enqueued connection waiter",
456
0
      metadata: [
457
0
        Metadata.waitersCount: .stringConvertible(self.waiters.count)
458
0
      ]
459
0
    )
460
0
461
0
    if self._shouldBringUpAnotherConnection() {
462
0
      self._startConnectingIdleConnection()
463
0
    }
464
0
  }
465
466
  /// Compute the current demand and capacity for streams.
467
  ///
468
  /// The 'demand' for streams is the number of reserved streams and the number of waiters. The
469
  /// capacity for streams is the product of max concurrent streams and the number of non-idle
470
  /// connections.
471
  ///
472
  /// - Returns: A tuple of the demand and capacity for streams.
473
  @usableFromInline
474
0
  internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) {
475
0
    let demand = self.sync.reservedStreams + self.sync.waiters
476
0
477
0
    // TODO: make this cheaper by storing and incrementally updating the number of idle connections
478
0
    let capacity = self._connections.values.reduce(0) { sum, state in
479
0
      if state.manager.sync.isIdle || state.isQuiescing {
480
0
        // Idle connection or quiescing (so the capacity should be ignored).
481
0
        return sum
482
0
      } else if let knownMaxAvailableStreams = state.maxAvailableStreams {
483
0
        // A known value of max concurrent streams, i.e. the connection is active.
484
0
        return sum + knownMaxAvailableStreams
485
0
      } else {
486
0
        // Not idle and no known value, the connection must be connecting so use our assumed value.
487
0
        return sum + self.assumedMaxConcurrentStreams
488
0
      }
489
0
    }
490
0
491
0
    return (demand, capacity)
492
0
  }
493
494
  /// Returns whether the pool should start connecting an idle connection (if one exists).
495
  @usableFromInline
496
0
  internal func _shouldBringUpAnotherConnection() -> Bool {
497
0
    let (demand, capacity) = self._computeStreamDemandAndCapacity()
498
0
499
0
    // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it
500
0
    // will always be greater than the threshold and a new connection will be spun up.
501
0
    let load = Double(demand) / Double(capacity)
502
0
    let loadExceedsThreshold = load >= self.reservationLoadThreshold
503
0
504
0
    if loadExceedsThreshold {
505
0
      self.logger.debug(
506
0
        "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available",
507
0
        metadata: [
508
0
          Metadata.reservationsCount: .stringConvertible(demand),
509
0
          Metadata.reservationsCapacity: .stringConvertible(capacity),
510
0
          Metadata.reservationsLoad: .stringConvertible(load),
511
0
          Metadata.reservationsLoadThreshold: .stringConvertible(self.reservationLoadThreshold),
512
0
        ]
513
0
      )
514
0
    }
515
0
516
0
    return loadExceedsThreshold
517
0
  }
518
519
  /// Starts connecting an idle connection, if one exists.
520
  @usableFromInline
521
0
  internal func _startConnectingIdleConnection() {
522
0
    if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) {
523
0
      self._connections.values[index].manager.sync.startConnecting()
524
0
    } else {
525
0
      let connecting = self._connections.values.count { $0.manager.sync.isConnecting }
526
0
      let ready = self._connections.values.count { $0.manager.sync.isReady }
527
0
      let transientFailure = self._connections.values.count { $0.manager.sync.isTransientFailure }
528
0
529
0
      self.logger.debug(
530
0
        "no idle connections in pool",
531
0
        metadata: [
532
0
          Metadata.connectionsConnecting: .stringConvertible(connecting),
533
0
          Metadata.connectionsReady: .stringConvertible(ready),
534
0
          Metadata.connectionsTransientFailure: .stringConvertible(transientFailure),
535
0
          Metadata.waitersCount: .stringConvertible(self.waiters.count),
536
0
        ]
537
0
      )
538
0
    }
539
0
  }
540
541
  /// Returns the index in `self.connections.values` of the connection with the most available
542
  /// streams. Returns `self.connections.endIndex` if no connection has at least one stream
543
  /// available.
544
  ///
545
  /// - Note: this is linear in the number of connections.
546
  @usableFromInline
547
  internal func _mostAvailableConnectionIndex()
548
    -> Dictionary<ConnectionManagerID, PerConnectionState>.Index
549
0
  {
550
0
    var index = self._connections.values.startIndex
551
0
    var selectedIndex = self._connections.values.endIndex
552
0
    var mostAvailableStreams = 0
553
0
554
0
    while index != self._connections.values.endIndex {
555
0
      let availableStreams = self._connections.values[index].availableStreams
556
0
      if availableStreams > mostAvailableStreams {
557
0
        mostAvailableStreams = availableStreams
558
0
        selectedIndex = index
559
0
      }
560
0
561
0
      self._connections.values.formIndex(after: &index)
562
0
    }
563
0
564
0
    return selectedIndex
565
0
  }
566
567
  /// Reserves a stream from the connection with the most available streams, if one exists.
568
  ///
569
  /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from,
570
  ///     or `nil` if no stream could be reserved.
571
  @usableFromInline
572
0
  internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? {
573
0
    let index = self._mostAvailableConnectionIndex()
574
0
575
0
    if index != self._connections.endIndex {
576
0
      // '!' is okay here; the most available connection must have at least one stream available
577
0
      // to reserve.
578
0
      return self._connections.values[index].reserveStream()!
579
0
    } else {
580
0
      return nil
581
0
    }
582
0
  }
583
584
  /// See `shutdown(mode:)`.
585
  ///
586
  /// - Parameter promise: A `promise` to complete when the pool has been shutdown.
587
  @usableFromInline
588
0
  internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
589
0
    self.eventLoop.assertInEventLoop()
590
0
591
0
    switch self._state {
592
0
    case .active:
593
0
      self.logger.debug("shutting down connection pool")
594
0
595
0
      // We're shutting down now and when that's done we'll be fully shutdown.
596
0
      self._state = .shuttingDown(promise.futureResult)
597
0
      promise.futureResult.whenComplete { _ in
598
0
        self._state = .shutdown
599
0
        self.delegate = nil
600
0
        self.logger.trace("finished shutting down connection pool")
601
0
      }
602
0
603
0
      // Shutdown all the connections and remove them from the pool.
604
0
      let connections = self._connections
605
0
      self._connections.removeAll()
606
0
607
0
      let allShutdown: [EventLoopFuture<Void>] = connections.values.map {
608
0
        let id = $0.manager.id
609
0
        let manager = $0.manager
610
0
611
0
        return manager.eventLoop.flatSubmit {
612
0
          // If the connection was idle/shutdown before calling shutdown then we shouldn't tell
613
0
          // the delegate the connection closed (because it either never connected or was already
614
0
          // informed about this).
615
0
          let connectionIsInactive = manager.sync.isIdle || manager.sync.isShutdown
616
0
          return manager.shutdown(mode: mode).always { _ in
617
0
            if !connectionIsInactive {
618
0
              self.delegate?.connectionClosed(id: .init(id), error: nil)
619
0
            }
620
0
            self.delegate?.connectionRemoved(id: .init(id))
621
0
          }
622
0
        }
623
0
      }
624
0
625
0
      // Fail the outstanding waiters.
626
0
      while let waiter = self.waiters.popFirst() {
627
0
        waiter.fail(GRPCConnectionPoolError.shutdown)
628
0
      }
629
0
630
0
      // Cascade the result of the shutdown into the promise.
631
0
      EventLoopFuture.andAllSucceed(allShutdown, promise: promise)
632
0
633
0
    case let .shuttingDown(future):
634
0
      // We're already shutting down, cascade the result.
635
0
      promise.completeWith(future)
636
0
637
0
    case .shutdown:
638
0
      // Already shutdown, fine.
639
0
      promise.succeed(())
640
0
    }
641
0
  }
642
643
0
  internal func stats() -> EventLoopFuture<GRPCSubPoolStats> {
644
0
    let promise = self.eventLoop.makePromise(of: GRPCSubPoolStats.self)
645
0
646
0
    if self.eventLoop.inEventLoop {
647
0
      self._stats(promise: promise)
648
0
    } else {
649
0
      self.eventLoop.execute {
650
0
        self._stats(promise: promise)
651
0
      }
652
0
    }
653
0
654
0
    return promise.futureResult
655
0
  }
656
657
0
  private func _stats(promise: EventLoopPromise<GRPCSubPoolStats>) {
658
0
    self.eventLoop.assertInEventLoop()
659
0
660
0
    var stats = GRPCSubPoolStats(id: self.id)
661
0
662
0
    for connection in self._connections.values {
663
0
      let sync = connection.manager.sync
664
0
      if sync.isIdle {
665
0
        stats.connectionStates.idle += 1
666
0
      } else if sync.isConnecting {
667
0
        stats.connectionStates.connecting += 1
668
0
      } else if sync.isReady {
669
0
        stats.connectionStates.ready += 1
670
0
      } else if sync.isTransientFailure {
671
0
        stats.connectionStates.transientFailure += 1
672
0
      }
673
0
674
0
      stats.streamsInUse += connection.reservedStreams
675
0
      stats.streamsFreeToUse += connection.availableStreams
676
0
    }
677
0
678
0
    stats.rpcsWaiting += self.waiters.count
679
0
680
0
    promise.succeed(stats)
681
0
  }
682
}
683
684
extension ConnectionPool: ConnectionManagerConnectivityDelegate {
685
  // We're interested in a few different situations here:
686
  //
687
  // 1. The connection was usable ('ready') and is no longer usable (either it became idle or
688
  //    encountered an error. If this happens we need to notify any connections of the change as
689
  //    they may no longer be used for new RPCs.
690
  // 2. The connection was not usable but moved to a different unusable state. If this happens and
691
  //    we know the cause of the state transition (i.e. the error) then we need to update our most
692
  //    recent error with the error. This information is used when failing waiters to provide some
693
  //    context as to why they may be failing.
694
  func connectionStateDidChange(
695
    _ manager: ConnectionManager,
696
    from oldState: _ConnectivityState,
697
    to newState: _ConnectivityState
698
0
  ) {
699
0
    switch (oldState, newState) {
700
0
    case let (.ready, .transientFailure(error)),
701
0
      let (.ready, .idle(.some(error))):
702
0
      self.updateMostRecentError(error)
703
0
      self.connectionUnavailable(manager.id)
704
0
705
0
    case (.ready, .idle(.none)),
706
0
      (.ready, .shutdown):
707
0
      self.connectionUnavailable(manager.id)
708
0
709
0
    case let (_, .transientFailure(error)),
710
0
      let (_, .idle(.some(error))):
711
0
      self.updateMostRecentError(error)
712
0
713
0
    default:
714
0
      ()
715
0
    }
716
0
717
0
    guard let delegate = self.delegate else { return }
718
0
719
0
    switch (oldState, newState) {
720
0
    case (.idle, .connecting),
721
0
      (.transientFailure, .connecting):
722
0
      delegate.startedConnecting(id: .init(manager.id))
723
0
724
0
    case (.connecting, .ready):
725
0
      // The connection becoming ready is handled by 'receivedSettingsMaxConcurrentStreams'.
726
0
      ()
727
0
728
0
    case (.ready, .idle):
729
0
      delegate.connectionClosed(id: .init(manager.id), error: nil)
730
0
731
0
    case let (.ready, .transientFailure(error)):
732
0
      delegate.connectionClosed(id: .init(manager.id), error: error)
733
0
734
0
    case let (.connecting, .transientFailure(error)):
735
0
      delegate.connectFailed(id: .init(manager.id), error: error)
736
0
737
0
    default:
738
0
      ()
739
0
    }
740
0
  }
741
742
0
  func connectionIsQuiescing(_ manager: ConnectionManager) {
743
0
    self.eventLoop.assertInEventLoop()
744
0
745
0
    // Find the relevant connection.
746
0
    guard let index = self._connections.index(forKey: manager.id) else {
747
0
      return
748
0
    }
749
0
750
0
    // Drop the connectivity delegate, we're no longer interested in its events now.
751
0
    manager.sync.connectivityDelegate = nil
752
0
753
0
    // Started quiescing; update our state and notify the pool delegate.
754
0
    self._connections.values[index].isQuiescing = true
755
0
    self.delegate?.connectionQuiescing(id: .init(manager.id))
756
0
757
0
    // As the connection is quescing, we need to know when the current connection its managing has
758
0
    // closed. When that happens drop the H2 delegate and update the pool delegate.
759
0
    manager.onCurrentConnectionClose { hadActiveConnection in
760
0
      assert(hadActiveConnection)
761
0
      if let removed = self._connections.removeValue(forKey: manager.id) {
762
0
        removed.manager.sync.http2Delegate = nil
763
0
        removed.manager.sync.shutdownNow()  // Manager may have internal state to tear down.
764
0
        self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil)
765
0
        self.delegate?.connectionRemoved(id: .init(removed.manager.id))
766
0
      }
767
0
    }
768
0
769
0
    // Grab the number of reserved streams (before invalidating the index by adding a connection).
770
0
    let reservedStreams = self._connections.values[index].reservedStreams
771
0
772
0
    // Replace the connection with a new idle one. Keep the idle behavior, so that
773
0
    // if it's a connection that should be kept alive, we maintain it.
774
0
    self.addConnectionToPool(idleBehavior: manager.idleBehavior)
775
0
776
0
    // Since we're removing this connection from the pool (and no new streams can be created on
777
0
    // the connection), the pool manager can ignore any streams reserved against this connection.
778
0
    // We do still care about the number of reserved streams for the connection though
779
0
    //
780
0
    // Note: we don't need to adjust the number of available streams as the effective number of
781
0
    // connections hasn't changed.
782
0
    self.streamLender.returnStreams(reservedStreams, to: self)
783
0
  }
784
785
0
  private func updateMostRecentError(_ error: Error) {
786
0
    self.eventLoop.assertInEventLoop()
787
0
    // Update the last known error if there is one. We will use it to provide some context to
788
0
    // waiters which may fail.
789
0
    self._mostRecentError = error
790
0
  }
791
792
  /// A connection has become unavailable.
793
0
  private func connectionUnavailable(_ id: ConnectionManagerID) {
794
0
    self.eventLoop.assertInEventLoop()
795
0
    // The connection is no longer available: any streams which haven't been closed will be counted
796
0
    // as a dropped reservation, we need to tell the pool manager about them.
797
0
    if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 {
798
0
      self.streamLender.returnStreams(droppedReservations, to: self)
799
0
    }
800
0
  }
801
}
802
803
extension ConnectionPool: ConnectionManagerHTTP2Delegate {
804
0
  internal func streamOpened(_ manager: ConnectionManager) {
805
0
    self.eventLoop.assertInEventLoop()
806
0
    if let utilization = self._connections[manager.id]?.openedStream(),
807
0
      let delegate = self.delegate
808
0
    {
809
0
      delegate.connectionUtilizationChanged(
810
0
        id: .init(manager.id),
811
0
        streamsUsed: utilization.used,
812
0
        streamCapacity: utilization.capacity
813
0
      )
814
0
    }
815
0
  }
816
817
0
  internal func streamClosed(_ manager: ConnectionManager) {
818
0
    self.eventLoop.assertInEventLoop()
819
0
820
0
    guard let index = self._connections.index(forKey: manager.id) else {
821
0
      return
822
0
    }
823
0
824
0
    // Return the stream the connection and to the pool manager.
825
0
    if let utilization = self._connections.values[index].returnStream(),
826
0
      let delegate = self.delegate
827
0
    {
828
0
      delegate.connectionUtilizationChanged(
829
0
        id: .init(manager.id),
830
0
        streamsUsed: utilization.used,
831
0
        streamCapacity: utilization.capacity
832
0
      )
833
0
    }
834
0
835
0
    // Return the stream to the pool manager if the connection is available and not quiescing. For
836
0
    // quiescing connections streams were returned when the connection started quiescing.
837
0
    if self._connections.values[index].isAvailable, !self._connections.values[index].isQuiescing {
838
0
      self.streamLender.returnStreams(1, to: self)
839
0
840
0
      // A stream was returned: we may be able to service a waiter now.
841
0
      self.tryServiceWaiters()
842
0
    }
843
0
  }
844
845
  internal func receivedSettingsMaxConcurrentStreams(
846
    _ manager: ConnectionManager,
847
    maxConcurrentStreams: Int
848
0
  ) {
849
0
    self.eventLoop.assertInEventLoop()
850
0
851
0
    // Find the relevant connection.
852
0
    guard let index = self._connections.index(forKey: manager.id) else {
853
0
      return
854
0
    }
855
0
856
0
    // When the connection is quiescing, the pool manager is not interested in updates to the
857
0
    // connection, bail out early.
858
0
    if self._connections.values[index].isQuiescing {
859
0
      return
860
0
    }
861
0
862
0
    // If we received a SETTINGS update then a connection is okay: drop the last known error.
863
0
    self._mostRecentError = nil
864
0
865
0
    let previous = self._connections.values[index].updateMaxConcurrentStreams(maxConcurrentStreams)
866
0
    let delta: Int
867
0
868
0
    if let previousValue = previous {
869
0
      // There was a previous value of max concurrent streams, i.e. a change in value for an
870
0
      // existing connection.
871
0
      delta = maxConcurrentStreams - previousValue
872
0
    } else {
873
0
      // There was no previous value so this must be a new connection. We'll compare against our
874
0
      // assumed default.
875
0
      delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams
876
0
      // Notify the delegate.
877
0
      self.delegate?.connectSucceeded(id: .init(manager.id), streamCapacity: maxConcurrentStreams)
878
0
    }
879
0
880
0
    if delta != 0 {
881
0
      self.streamLender.changeStreamCapacity(by: delta, for: self)
882
0
    }
883
0
884
0
    // We always check, even if `delta` isn't greater than zero as this might be a new connection.
885
0
    self.tryServiceWaiters()
886
0
  }
887
}
888
889
extension ConnectionPool {
890
  // MARK: - Waiters
891
892
  /// Try to service as many waiters as possible.
893
  ///
894
  /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number
895
  /// of waiters and `N` is the number of connections.
896
0
  private func tryServiceWaiters() {
897
0
    if self.waiters.isEmpty { return }
898
0
899
0
    self.logger.trace(
900
0
      "servicing waiters",
901
0
      metadata: [
902
0
        Metadata.waitersCount: .stringConvertible(self.waiters.count)
903
0
      ]
904
0
    )
905
0
906
0
    let now = self.now()
907
0
    var serviced = 0
908
0
909
0
    while !self.waiters.isEmpty {
910
0
      if self.waiters.first!.deadlineIsAfter(now) {
911
0
        if let multiplexer = self._reserveStreamFromMostAvailableConnection() {
912
0
          // The waiter's deadline is in the future, and we have a suitable connection. Remove and
913
0
          // succeed the waiter.
914
0
          let waiter = self.waiters.removeFirst()
915
0
          serviced &+= 1
916
0
          waiter.succeed(with: multiplexer)
917
0
        } else {
918
0
          // There are waiters but no available connections, we're done.
919
0
          break
920
0
        }
921
0
      } else {
922
0
        // The waiter's deadline has already expired, there's no point completing it. Remove it and
923
0
        // let its scheduled timeout fail the promise.
924
0
        self.waiters.removeFirst()
925
0
      }
926
0
    }
927
0
928
0
    self.logger.trace(
929
0
      "done servicing waiters",
930
0
      metadata: [
931
0
        Metadata.waitersCount: .stringConvertible(self.waiters.count),
932
0
        Metadata.waitersServiced: .stringConvertible(serviced),
933
0
      ]
934
0
    )
935
0
  }
936
}
937
938
extension ConnectionPool {
939
  /// Synchronous operations for the pool, mostly used by tests.
940
  internal struct Sync {
941
    private let pool: ConnectionPool
942
943
0
    fileprivate init(_ pool: ConnectionPool) {
944
0
      self.pool = pool
945
0
    }
946
947
    /// The number of outstanding connection waiters.
948
0
    internal var waiters: Int {
949
0
      self.pool.eventLoop.assertInEventLoop()
950
0
      return self.pool.waiters.count
951
0
    }
952
953
    /// The number of connection currently in the pool (in any state).
954
0
    internal var connections: Int {
955
0
      self.pool.eventLoop.assertInEventLoop()
956
0
      return self.pool._connections.count
957
0
    }
958
959
    /// The number of idle connections in the pool.
960
0
    internal var idleConnections: Int {
961
0
      self.pool.eventLoop.assertInEventLoop()
962
0
      return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
963
0
    }
964
965
    /// The number of active (i.e. connecting or ready) connections in the pool.
966
0
    internal var activeConnections: Int {
967
0
      self.pool.eventLoop.assertInEventLoop()
968
0
      return self.pool._connections.values.reduce(0) {
969
0
        $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0)
970
0
      }
971
0
    }
972
973
    /// The number of connections in the pool in transient failure state.
974
0
    internal var transientFailureConnections: Int {
975
0
      self.pool.eventLoop.assertInEventLoop()
976
0
      return self.pool._connections.values.reduce(0) {
977
0
        $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0)
978
0
      }
979
0
    }
980
981
    /// The number of streams currently available to reserve across all connections in the pool.
982
0
    internal var availableStreams: Int {
983
0
      self.pool.eventLoop.assertInEventLoop()
984
0
      return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams }
985
0
    }
986
987
    /// The number of streams which have been reserved across all connections in the pool.
988
0
    internal var reservedStreams: Int {
989
0
      self.pool.eventLoop.assertInEventLoop()
990
0
      return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams }
991
0
    }
992
993
    /// Updates the most recent connection error.
994
0
    internal func updateMostRecentError(_ error: Error) {
995
0
      self.pool.eventLoop.assertInEventLoop()
996
0
      self.pool.updateMostRecentError(error)
997
0
    }
998
  }
999
1000
0
  internal var sync: Sync {
1001
0
    return Sync(self)
1002
0
  }
1003
}
1004
1005
/// An error thrown from the ``GRPCChannelPool``.
1006
public struct GRPCConnectionPoolError: Error, CustomStringConvertible {
1007
  public struct Code: Hashable, Sendable, CustomStringConvertible {
1008
    enum Code {
1009
      case shutdown
1010
      case tooManyWaiters
1011
      case deadlineExceeded
1012
    }
1013
1014
    fileprivate var code: Code
1015
1016
0
    private init(_ code: Code) {
1017
0
      self.code = code
1018
0
    }
1019
1020
0
    public var description: String {
1021
0
      String(describing: self.code)
1022
0
    }
1023
1024
    /// The pool is shutdown or shutting down.
1025
0
    public static var shutdown: Self { Self(.shutdown) }
1026
1027
    /// There are too many waiters in the pool.
1028
0
    public static var tooManyWaiters: Self { Self(.tooManyWaiters) }
1029
1030
    /// The deadline for creating a stream has passed.
1031
0
    public static var deadlineExceeded: Self { Self(.deadlineExceeded) }
1032
  }
1033
1034
  /// The error code.
1035
  public var code: Code
1036
1037
  /// An underlying error which caused this error to be thrown.
1038
  public var underlyingError: Error?
1039
1040
0
  public var description: String {
1041
0
    if let underlyingError = self.underlyingError {
1042
0
      return "\(self.code) (\(underlyingError))"
1043
0
    } else {
1044
0
      return String(describing: self.code)
1045
0
    }
1046
0
  }
1047
1048
  /// Create a new connection pool error with the given code and underlying error.
1049
  ///
1050
  /// - Parameters:
1051
  ///   - code: The error code.
1052
  ///   - underlyingError: The underlying error which led to this error being thrown.
1053
0
  public init(code: Code, underlyingError: Error? = nil) {
1054
0
    self.code = code
1055
0
    self.underlyingError = underlyingError
1056
0
  }
1057
}
1058
1059
extension GRPCConnectionPoolError {
1060
  @usableFromInline
1061
  static let shutdown = Self(code: .shutdown)
1062
1063
  @inlinable
1064
0
  static func tooManyWaiters(connectionError: Error?) -> Self {
1065
0
    Self(code: .tooManyWaiters, underlyingError: connectionError)
1066
0
  }
1067
1068
  @inlinable
1069
0
  static func deadlineExceeded(connectionError: Error?) -> Self {
1070
0
    Self(code: .deadlineExceeded, underlyingError: connectionError)
1071
0
  }
1072
}
1073
1074
extension GRPCConnectionPoolError: GRPCStatusTransformable {
1075
0
  public func makeGRPCStatus() -> GRPCStatus {
1076
0
    switch self.code.code {
1077
0
    case .shutdown:
1078
0
      return GRPCStatus(
1079
0
        code: .unavailable,
1080
0
        message: "The connection pool is shutdown",
1081
0
        cause: self.underlyingError
1082
0
      )
1083
0
1084
0
    case .tooManyWaiters:
1085
0
      return GRPCStatus(
1086
0
        code: .resourceExhausted,
1087
0
        message: "The connection pool has no capacity for new RPCs or RPC waiters",
1088
0
        cause: self.underlyingError
1089
0
      )
1090
0
1091
0
    case .deadlineExceeded:
1092
0
      return GRPCStatus(
1093
0
        code: .deadlineExceeded,
1094
0
        message: "Timed out waiting for an HTTP/2 stream from the connection pool",
1095
0
        cause: self.underlyingError
1096
0
      )
1097
0
    }
1098
0
  }
1099
}
1100
1101
extension Sequence {
1102
0
  fileprivate func count(where predicate: (Element) -> Bool) -> Int {
1103
0
    return self.reduce(0) { count, element in
1104
0
      predicate(element) ? count + 1 : count
1105
0
    }
1106
0
  }
1107
}