Coverage Report

Created: 2026-03-11 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift
Line
Count
Source
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
18
@usableFromInline
19
internal struct PoolManagerStateMachine {
20
  /// The current state.
21
  @usableFromInline
22
  internal var state: State
23
24
  @usableFromInline
25
0
  internal init(_ state: State) {
26
0
    self.state = state
27
0
  }
28
29
  @usableFromInline
30
  internal enum State {
31
    case inactive
32
    case active(ActiveState)
33
    case shuttingDown(EventLoopFuture<Void>)
34
    case shutdown
35
    case _modifying
36
  }
37
38
  @usableFromInline
39
  internal struct ActiveState {
40
    @usableFromInline
41
    internal var pools: [EventLoopID: PerPoolState]
42
43
    @usableFromInline
44
    internal var statsTask: RepeatedTask?
45
46
    @usableFromInline
47
    internal init(
48
      poolKeys: [PoolManager.ConnectionPoolKey],
49
      assumedMaxAvailableStreamsPerPool: Int,
50
      statsTask: RepeatedTask?
51
0
    ) {
52
0
      self.pools = Dictionary(
53
0
        uniqueKeysWithValues: poolKeys.map { key in
54
0
          let value = PerPoolState(
55
0
            poolIndex: key.index,
56
0
            assumedMaxAvailableStreams: assumedMaxAvailableStreamsPerPool
57
0
          )
58
0
          return (key.eventLoopID, value)
59
0
        }
60
0
      )
61
0
      self.statsTask = statsTask
62
0
    }
63
  }
64
65
  /// Temporarily sets `self.state` to `._modifying` before calling the provided closure and setting
66
  /// `self.state` to the `State` modified by the closure.
67
  @inlinable
68
0
  internal mutating func modifyingState<Result>(_ modify: (inout State) -> Result) -> Result {
69
0
    var state = State._modifying
70
0
    swap(&self.state, &state)
71
0
    defer {
72
0
      self.state = state
73
0
    }
74
0
    return modify(&state)
75
0
  }
76
77
  /// Returns whether the pool is shutdown or in the process of shutting down.
78
  @usableFromInline
79
0
  internal var isShutdownOrShuttingDown: Bool {
80
0
    switch self.state {
81
0
    case .shuttingDown, .shutdown:
82
0
      return true
83
0
    case .inactive, .active:
84
0
      return false
85
0
    case ._modifying:
86
0
      preconditionFailure()
87
0
    }
88
0
  }
89
90
  /// Activate the pool manager by providing an array of connection pools.
91
  ///
92
  /// - Parameters:
93
  ///   - keys: The index and `EventLoopID` of the pools.
94
  ///   - capacity: The *assumed* maximum number of streams concurrently available to a pool (that
95
  ///       is, the product of the assumed value of max concurrent streams and the number of
96
  ///       connections per pool).
97
  @usableFromInline
98
  internal mutating func activatePools(
99
    keyedBy keys: [PoolManager.ConnectionPoolKey],
100
    assumingPerPoolCapacity capacity: Int,
101
    statsTask: RepeatedTask?
102
0
  ) {
103
0
    self.modifyingState { state in
104
0
      switch state {
105
0
      case .inactive:
106
0
        let active = ActiveState(
107
0
          poolKeys: keys,
108
0
          assumedMaxAvailableStreamsPerPool: capacity,
109
0
          statsTask: statsTask
110
0
        )
111
0
        state = .active(active)
112
0
113
0
      case .active, .shuttingDown, .shutdown, ._modifying:
114
0
        preconditionFailure()
115
0
      }
116
0
    }
117
0
  }
118
119
  /// Select and reserve a stream from a connection pool.
120
  @inlinable
121
  mutating func reserveStream(
122
    preferringPoolWithEventLoopID eventLoopID: EventLoopID?
123
0
  ) -> Result<PoolManager.ConnectionPoolIndex, PoolManagerError> {
124
0
    return self.modifyingState { state in
125
0
      switch state {
126
0
      case var .active(active):
127
0
        let connectionPoolIndex: PoolManager.ConnectionPoolIndex
128
0
129
0
        if let index = eventLoopID.flatMap({ eventLoopID in
130
0
          active.reserveStreamFromPool(onEventLoopWithID: eventLoopID)
131
0
        }) {
132
0
          connectionPoolIndex = index
133
0
        } else {
134
0
          // Nothing on the preferred event loop; fallback to the pool with the most available
135
0
          // streams.
136
0
          connectionPoolIndex = active.reserveStreamFromPoolWithMostAvailableStreams()
137
0
        }
138
0
139
0
        state = .active(active)
140
0
        return .success(connectionPoolIndex)
141
0
142
0
      case .inactive:
143
0
        return .failure(.notInitialized)
144
0
145
0
      case .shuttingDown, .shutdown:
146
0
        return .failure(.shutdown)
147
0
148
0
      case ._modifying:
149
0
        preconditionFailure()
150
0
      }
151
0
    }
152
0
  }
153
154
  /// Return streams to the given pool.
155
0
  mutating func returnStreams(_ count: Int, toPoolOnEventLoopWithID eventLoopID: EventLoopID) {
156
0
    self.modifyingState { state in
157
0
      switch state {
158
0
      case var .active(active):
159
0
        active.returnStreams(count, toPoolOnEventLoopWithID: eventLoopID)
160
0
        state = .active(active)
161
0
162
0
      case .shuttingDown, .shutdown:
163
0
        ()
164
0
165
0
      case .inactive, ._modifying:
166
0
        // If the manager is inactive there are no pools which can return streams.
167
0
        preconditionFailure()
168
0
      }
169
0
    }
170
0
  }
171
172
  /// Update the capacity for the given pool.
173
  mutating func changeStreamCapacity(
174
    by delta: Int,
175
    forPoolOnEventLoopWithID eventLoopID: EventLoopID
176
0
  ) {
177
0
    self.modifyingState { state in
178
0
      switch state {
179
0
      case var .active(active):
180
0
        active.increaseMaxAvailableStreams(by: delta, forPoolOnEventLoopWithID: eventLoopID)
181
0
        state = .active(active)
182
0
183
0
      case .shuttingDown, .shutdown:
184
0
        ()
185
0
186
0
      case .inactive, ._modifying:
187
0
        // If the manager is inactive there are no pools which can update their capacity.
188
0
        preconditionFailure()
189
0
      }
190
0
    }
191
0
  }
192
193
  enum ShutdownAction {
194
    case shutdownPools(RepeatedTask?)
195
    case alreadyShutdown
196
    case alreadyShuttingDown(EventLoopFuture<Void>)
197
  }
198
199
0
  mutating func shutdown(promise: EventLoopPromise<Void>) -> ShutdownAction {
200
0
    self.modifyingState { state in
201
0
      switch state {
202
0
      case .inactive:
203
0
        state = .shutdown
204
0
        return .alreadyShutdown
205
0
206
0
      case .active(let active):
207
0
        state = .shuttingDown(promise.futureResult)
208
0
        return .shutdownPools(active.statsTask)
209
0
210
0
      case let .shuttingDown(future):
211
0
        return .alreadyShuttingDown(future)
212
0
213
0
      case .shutdown:
214
0
        return .alreadyShutdown
215
0
216
0
      case ._modifying:
217
0
        preconditionFailure()
218
0
      }
219
0
    }
220
0
  }
221
222
0
  mutating func shutdownComplete() {
223
0
    self.modifyingState { state in
224
0
      switch state {
225
0
      case .shuttingDown:
226
0
        state = .shutdown
227
0
228
0
      case .inactive, .active, .shutdown, ._modifying:
229
0
        preconditionFailure()
230
0
      }
231
0
    }
232
0
  }
233
}
234
235
extension PoolManagerStateMachine.ActiveState {
236
  @usableFromInline
237
  mutating func reserveStreamFromPool(
238
    onEventLoopWithID eventLoopID: EventLoopID
239
0
  ) -> PoolManager.ConnectionPoolIndex? {
240
0
    return self.pools[eventLoopID]?.reserveStream()
241
0
  }
242
243
  @usableFromInline
244
0
  mutating func reserveStreamFromPoolWithMostAvailableStreams() -> PoolManager.ConnectionPoolIndex {
245
0
    // We don't allow pools to be empty (while active).
246
0
    assert(!self.pools.isEmpty)
247
0
248
0
    var mostAvailableStreams = Int.min
249
0
    var mostAvailableIndex = self.pools.values.startIndex
250
0
    var index = mostAvailableIndex
251
0
252
0
    while index != self.pools.values.endIndex {
253
0
      let availableStreams = self.pools.values[index].availableStreams
254
0
255
0
      if availableStreams > mostAvailableStreams {
256
0
        mostAvailableIndex = index
257
0
        mostAvailableStreams = availableStreams
258
0
      }
259
0
260
0
      self.pools.values.formIndex(after: &index)
261
0
    }
262
0
263
0
    return self.pools.values[mostAvailableIndex].reserveStream()
264
0
  }
265
266
  mutating func returnStreams(
267
    _ count: Int,
268
    toPoolOnEventLoopWithID eventLoopID: EventLoopID
269
0
  ) {
270
0
    self.pools[eventLoopID]?.returnReservedStreams(count)
271
0
  }
272
273
  mutating func increaseMaxAvailableStreams(
274
    by delta: Int,
275
    forPoolOnEventLoopWithID eventLoopID: EventLoopID
276
0
  ) {
277
0
    self.pools[eventLoopID]?.maxAvailableStreams += delta
278
0
  }
279
}