/src/grpc-swift/Sources/GRPC/ConnectionPool/ConnectionPool.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | import Atomics |
18 | | import Logging |
19 | | import NIOConcurrencyHelpers |
20 | | import NIOCore |
21 | | import NIOHTTP2 |
22 | | |
23 | | @usableFromInline |
24 | | internal final class ConnectionPool { |
25 | | /// The event loop all connections in this pool are running on. |
26 | | @usableFromInline |
27 | | internal let eventLoop: EventLoop |
28 | | |
29 | | @usableFromInline |
30 | | internal enum State { |
31 | | case active |
32 | | case shuttingDown(EventLoopFuture<Void>) |
33 | | case shutdown |
34 | | } |
35 | | |
36 | | /// The state of the connection pool. |
37 | | @usableFromInline |
38 | 0 | internal var _state: State = .active |
39 | | |
40 | | /// The most recent connection error we have observed. |
41 | | /// |
42 | | /// This error is used to provide additional context to failed waiters. A waiter may, for example, |
43 | | /// timeout because the pool is busy, or because no connection can be established because of an |
44 | | /// underlying connection error. In the latter case it's useful for the caller to know why the |
45 | | /// connection is failing at the RPC layer. |
46 | | /// |
47 | | /// This value is cleared when a connection becomes 'available'. That is, when we receive an |
48 | | /// http/2 SETTINGS frame. |
49 | | /// |
50 | | /// This value is set whenever an underlying connection transitions to the transient failure state |
51 | | /// or to the idle state and has an associated error. |
52 | | @usableFromInline |
53 | 0 | internal var _mostRecentError: Error? = nil |
54 | | |
55 | | /// Connection managers and their stream availability state keyed by the ID of the connection |
56 | | /// manager. |
57 | | /// |
58 | | /// Connections are accessed by their ID for connection state changes (infrequent) and when |
59 | | /// streams are closed (frequent). However when choosing which connection to succeed a waiter |
60 | | /// with (frequent) requires the connections to be ordered by their availability. A dictionary |
61 | | /// might not be the most efficient data structure (a queue prioritised by stream availability may |
62 | | /// be a better choice given the number of connections is likely to be very low in practice). |
63 | | @usableFromInline |
64 | | internal var _connections: [ConnectionManagerID: PerConnectionState] |
65 | | |
66 | | /// The threshold which if exceeded when creating a stream determines whether the pool will |
67 | | /// start connecting an idle connection (if one exists). |
68 | | /// |
69 | | /// The 'load' is calculated as the ratio of demand for streams (the sum of the number of waiters |
70 | | /// and the number of reserved streams) and the total number of streams which non-idle connections |
71 | | /// could support (this includes the streams that a connection in the connecting state could |
72 | | /// support). |
73 | | @usableFromInline |
74 | | internal let reservationLoadThreshold: Double |
75 | | |
76 | | /// The assumed value for the maximum number of concurrent streams a connection can support. We |
77 | | /// assume a connection will support this many streams until we know better. |
78 | | @usableFromInline |
79 | | internal let assumedMaxConcurrentStreams: Int |
80 | | |
81 | | /// A queue of waiters which may or may not get a stream in the future. |
82 | | @usableFromInline |
83 | | internal var waiters: CircularBuffer<Waiter> |
84 | | |
85 | | /// The maximum number of waiters allowed, the size of `waiters` must not exceed this value. If |
86 | | /// there are this many waiters in the queue then the next waiter will be failed immediately. |
87 | | @usableFromInline |
88 | | internal let maxWaiters: Int |
89 | | |
90 | | /// The number of connections in the pool that should always be kept open (i.e. they won't go idle). |
91 | | /// In other words, it's the number of connections for which we should ignore idle timers. |
92 | | @usableFromInline |
93 | | internal let minConnections: Int |
94 | | |
95 | | /// Configuration for backoff between subsequence connection attempts. |
96 | | @usableFromInline |
97 | | internal let connectionBackoff: ConnectionBackoff |
98 | | |
99 | | /// Provides a channel factory to the `ConnectionManager`. |
100 | | @usableFromInline |
101 | | internal let channelProvider: ConnectionManagerChannelProvider |
102 | | |
103 | | /// The object to notify about changes to stream reservations; in practice this is usually |
104 | | /// the `PoolManager`. |
105 | | @usableFromInline |
106 | | internal let streamLender: StreamLender |
107 | | |
108 | | @usableFromInline |
109 | | internal var delegate: GRPCConnectionPoolDelegate? |
110 | | |
111 | | /// A logger. |
112 | | @usableFromInline |
113 | | internal let logger: Logger |
114 | | |
115 | | /// Returns `NIODeadline` representing 'now'. This is useful for testing. |
116 | | @usableFromInline |
117 | | internal let now: () -> NIODeadline |
118 | | |
119 | | /// The ID of this sub-pool. |
120 | | @usableFromInline |
121 | | internal let id: GRPCSubPoolID |
122 | | |
123 | | /// Logging metadata keys. |
124 | | @usableFromInline |
125 | | internal enum Metadata { |
126 | | /// The ID of this pool. |
127 | | @usableFromInline |
128 | | static let id = "pool.id" |
129 | | /// The number of stream reservations (i.e. number of open streams + number of waiters). |
130 | | @usableFromInline |
131 | | static let reservationsCount = "pool.reservations.count" |
132 | | /// The number of streams this pool can support with non-idle connections at this time. |
133 | | @usableFromInline |
134 | | static let reservationsCapacity = "pool.reservations.capacity" |
135 | | /// The current reservation load (i.e. reservation count / reservation capacity) |
136 | | @usableFromInline |
137 | | static let reservationsLoad = "pool.reservations.load" |
138 | | /// The reservation load threshold, above which a new connection will be created (if possible). |
139 | | @usableFromInline |
140 | | static let reservationsLoadThreshold = "pool.reservations.loadThreshold" |
141 | | /// The current number of waiters in the pool. |
142 | | @usableFromInline |
143 | | static let waitersCount = "pool.waiters.count" |
144 | | /// The maximum number of waiters the pool is configured to allow. |
145 | | @usableFromInline |
146 | | static let waitersMax = "pool.waiters.max" |
147 | | /// The number of waiters which were successfully serviced. |
148 | | @usableFromInline |
149 | | static let waitersServiced = "pool.waiters.serviced" |
150 | | /// The ID of waiter. |
151 | | @usableFromInline |
152 | | static let waiterID = "pool.waiter.id" |
153 | | /// The maximum number of connections allowed in the pool. |
154 | | @usableFromInline |
155 | | static let connectionsMax = "pool.connections.max" |
156 | | /// The number of connections in the ready state. |
157 | | @usableFromInline |
158 | | static let connectionsReady = "pool.connections.ready" |
159 | | /// The number of connections in the connecting state. |
160 | | @usableFromInline |
161 | | static let connectionsConnecting = "pool.connections.connecting" |
162 | | /// The number of connections in the transient failure state. |
163 | | @usableFromInline |
164 | | static let connectionsTransientFailure = "pool.connections.transientFailure" |
165 | | } |
166 | | |
167 | | @usableFromInline |
168 | | init( |
169 | | eventLoop: EventLoop, |
170 | | maxWaiters: Int, |
171 | | minConnections: Int, |
172 | | reservationLoadThreshold: Double, |
173 | | assumedMaxConcurrentStreams: Int, |
174 | | connectionBackoff: ConnectionBackoff, |
175 | | channelProvider: ConnectionManagerChannelProvider, |
176 | | streamLender: StreamLender, |
177 | | delegate: GRPCConnectionPoolDelegate?, |
178 | | logger: Logger, |
179 | 0 | now: @escaping () -> NIODeadline = NIODeadline.now |
180 | 0 | ) { |
181 | 0 | precondition( |
182 | 0 | (0.0 ... 1.0).contains(reservationLoadThreshold), |
183 | 0 | "reservationLoadThreshold must be within the range 0.0 ... 1.0" |
184 | 0 | ) |
185 | 0 |
|
186 | 0 | self.reservationLoadThreshold = reservationLoadThreshold |
187 | 0 | self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams |
188 | 0 |
|
189 | 0 | self._connections = [:] |
190 | 0 | self.maxWaiters = maxWaiters |
191 | 0 | self.minConnections = minConnections |
192 | 0 | self.waiters = CircularBuffer(initialCapacity: 16) |
193 | 0 |
|
194 | 0 | self.eventLoop = eventLoop |
195 | 0 | self.connectionBackoff = connectionBackoff |
196 | 0 | self.channelProvider = channelProvider |
197 | 0 | self.streamLender = streamLender |
198 | 0 | self.delegate = delegate |
199 | 0 | self.now = now |
200 | 0 |
|
201 | 0 | let id = GRPCSubPoolID.next() |
202 | 0 | var logger = logger |
203 | 0 | logger[metadataKey: Metadata.id] = "\(id)" |
204 | 0 |
|
205 | 0 | self.id = id |
206 | 0 | self.logger = logger |
207 | 0 | } |
208 | | |
209 | | /// Initialize the connection pool. |
210 | | /// |
211 | | /// - Parameter connections: The number of connections to add to the pool. |
212 | 0 | internal func initialize(connections: Int) { |
213 | 0 | assert(self._connections.isEmpty) |
214 | 0 | self.logger.debug( |
215 | 0 | "initializing new sub-pool", |
216 | 0 | metadata: [ |
217 | 0 | Metadata.waitersMax: .stringConvertible(self.maxWaiters), |
218 | 0 | Metadata.connectionsMax: .stringConvertible(connections), |
219 | 0 | ] |
220 | 0 | ) |
221 | 0 | self._connections.reserveCapacity(connections) |
222 | 0 | var numberOfKeepOpenConnections = self.minConnections |
223 | 0 | while self._connections.count < connections { |
224 | 0 | // If we have less than the minimum number of connections, don't let |
225 | 0 | // the new connection close when idle. |
226 | 0 | let idleBehavior = |
227 | 0 | numberOfKeepOpenConnections > 0 |
228 | 0 | ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout |
229 | 0 | numberOfKeepOpenConnections -= 1 |
230 | 0 | self.addConnectionToPool(idleBehavior: idleBehavior) |
231 | 0 | } |
232 | 0 | } |
233 | | |
234 | | /// Make and add a new connection to the pool. |
235 | 0 | private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) { |
236 | 0 | let manager = ConnectionManager( |
237 | 0 | eventLoop: self.eventLoop, |
238 | 0 | channelProvider: self.channelProvider, |
239 | 0 | callStartBehavior: .waitsForConnectivity, |
240 | 0 | idleBehavior: idleBehavior, |
241 | 0 | connectionBackoff: self.connectionBackoff, |
242 | 0 | connectivityDelegate: self, |
243 | 0 | http2Delegate: self, |
244 | 0 | logger: self.logger |
245 | 0 | ) |
246 | 0 | let id = manager.id |
247 | 0 | self._connections[id] = PerConnectionState(manager: manager) |
248 | 0 | self.delegate?.connectionAdded(id: .init(id)) |
249 | 0 |
|
250 | 0 | // If it's one of the connections that should be kept open, then connect |
251 | 0 | // straight away. |
252 | 0 | switch idleBehavior { |
253 | 0 | case .neverGoIdle: |
254 | 0 | self.eventLoop.execute { |
255 | 0 | if manager.sync.isIdle { |
256 | 0 | manager.sync.startConnecting() |
257 | 0 | } |
258 | 0 | } |
259 | 0 | case .closeWhenIdleTimeout: |
260 | 0 | () |
261 | 0 | } |
262 | 0 | } |
263 | | |
264 | | // MARK: - Called from the pool manager |
265 | | |
266 | | /// Make and initialize an HTTP/2 stream `Channel`. |
267 | | /// |
268 | | /// - Parameters: |
269 | | /// - deadline: The point in time by which the `promise` must have been resolved. |
270 | | /// - promise: A promise for a `Channel`. |
271 | | /// - logger: A request logger. |
272 | | /// - initializer: A closure to initialize the `Channel` with. |
273 | | @inlinable |
274 | | internal func makeStream( |
275 | | deadline: NIODeadline, |
276 | | promise: EventLoopPromise<Channel>, |
277 | | logger: Logger, |
278 | | initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void> |
279 | 0 | ) { |
280 | 0 | if self.eventLoop.inEventLoop { |
281 | 0 | self._makeStream( |
282 | 0 | deadline: deadline, |
283 | 0 | promise: promise, |
284 | 0 | logger: logger, |
285 | 0 | initializer: initializer |
286 | 0 | ) |
287 | 0 | } else { |
288 | 0 | self.eventLoop.execute { |
289 | 0 | self._makeStream( |
290 | 0 | deadline: deadline, |
291 | 0 | promise: promise, |
292 | 0 | logger: logger, |
293 | 0 | initializer: initializer |
294 | 0 | ) |
295 | 0 | } |
296 | 0 | } |
297 | 0 | } |
298 | | |
299 | | /// See `makeStream(deadline:promise:logger:initializer:)`. |
300 | | @inlinable |
301 | | internal func makeStream( |
302 | | deadline: NIODeadline, |
303 | | logger: Logger, |
304 | | initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void> |
305 | 0 | ) -> EventLoopFuture<Channel> { |
306 | 0 | let promise = self.eventLoop.makePromise(of: Channel.self) |
307 | 0 | self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer) |
308 | 0 | return promise.futureResult |
309 | 0 | } |
310 | | |
311 | | /// Shutdown the connection pool. |
312 | | /// |
313 | | /// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent |
314 | | /// calls to `makeStream` will be failed immediately. |
315 | | /// |
316 | | /// - Parameter mode: The mode to use when shutting down. |
317 | | /// - Returns: A future indicated when shutdown has been completed. |
318 | 0 | internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> { |
319 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
320 | 0 |
|
321 | 0 | if self.eventLoop.inEventLoop { |
322 | 0 | self._shutdown(mode: mode, promise: promise) |
323 | 0 | } else { |
324 | 0 | self.eventLoop.execute { |
325 | 0 | self._shutdown(mode: mode, promise: promise) |
326 | 0 | } |
327 | 0 | } |
328 | 0 |
|
329 | 0 | return promise.futureResult |
330 | 0 | } |
331 | | |
332 | | /// See `makeStream(deadline:promise:logger:initializer:)`. |
333 | | /// |
334 | | /// - Important: Must be called on the pool's `EventLoop`. |
335 | | @inlinable |
336 | | internal func _makeStream( |
337 | | deadline: NIODeadline, |
338 | | promise: EventLoopPromise<Channel>, |
339 | | logger: Logger, |
340 | | initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void> |
341 | 0 | ) { |
342 | 0 | self.eventLoop.assertInEventLoop() |
343 | 0 |
|
344 | 0 | guard case .active = self._state else { |
345 | 0 | // Fail the promise right away if we're shutting down or already shut down. |
346 | 0 | promise.fail(GRPCConnectionPoolError.shutdown) |
347 | 0 | return |
348 | 0 | } |
349 | 0 |
|
350 | 0 | // Try to make a stream on an existing connection. |
351 | 0 | let streamCreated = self._tryMakeStream(promise: promise, initializer: initializer) |
352 | 0 |
|
353 | 0 | if !streamCreated { |
354 | 0 | // No stream was created, wait for one. |
355 | 0 | self._enqueueWaiter( |
356 | 0 | deadline: deadline, |
357 | 0 | promise: promise, |
358 | 0 | logger: logger, |
359 | 0 | initializer: initializer |
360 | 0 | ) |
361 | 0 | } |
362 | 0 | } |
363 | | |
364 | | /// Try to find an existing connection on which we can make a stream. |
365 | | /// |
366 | | /// - Parameters: |
367 | | /// - promise: A promise to succeed if we can make a stream. |
368 | | /// - initializer: A closure to initialize the stream with. |
369 | | /// - Returns: A boolean value indicating whether the stream was created or not. |
370 | | @inlinable |
371 | | internal func _tryMakeStream( |
372 | | promise: EventLoopPromise<Channel>, |
373 | | initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void> |
374 | 0 | ) -> Bool { |
375 | 0 | // We shouldn't jump the queue. |
376 | 0 | guard self.waiters.isEmpty else { |
377 | 0 | return false |
378 | 0 | } |
379 | 0 |
|
380 | 0 | // Reserve a stream, if we can. |
381 | 0 | guard let multiplexer = self._reserveStreamFromMostAvailableConnection() else { |
382 | 0 | return false |
383 | 0 | } |
384 | 0 |
|
385 | 0 | multiplexer.createStreamChannel(promise: promise, initializer) |
386 | 0 |
|
387 | 0 | // Has reserving another stream tipped us over the limit for needing another connection? |
388 | 0 | if self._shouldBringUpAnotherConnection() { |
389 | 0 | self._startConnectingIdleConnection() |
390 | 0 | } |
391 | 0 |
|
392 | 0 | return true |
393 | 0 | } |
394 | | |
395 | | /// Enqueue a waiter to be provided with a stream at some point in the future. |
396 | | /// |
397 | | /// - Parameters: |
398 | | /// - deadline: The point in time by which the promise should have been completed. |
399 | | /// - promise: The promise to complete with the `Channel`. |
400 | | /// - logger: A logger. |
401 | | /// - initializer: A closure to initialize the `Channel` with. |
402 | | @inlinable |
403 | | internal func _enqueueWaiter( |
404 | | deadline: NIODeadline, |
405 | | promise: EventLoopPromise<Channel>, |
406 | | logger: Logger, |
407 | | initializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void> |
408 | 0 | ) { |
409 | 0 | // Don't overwhelm the pool with too many waiters. |
410 | 0 | guard self.waiters.count < self.maxWaiters else { |
411 | 0 | logger.trace( |
412 | 0 | "connection pool has too many waiters", |
413 | 0 | metadata: [ |
414 | 0 | Metadata.waitersMax: .stringConvertible(self.maxWaiters) |
415 | 0 | ] |
416 | 0 | ) |
417 | 0 | promise.fail(GRPCConnectionPoolError.tooManyWaiters(connectionError: self._mostRecentError)) |
418 | 0 | return |
419 | 0 | } |
420 | 0 |
|
421 | 0 | let waiter = Waiter(deadline: deadline, promise: promise, channelInitializer: initializer) |
422 | 0 |
|
423 | 0 | // Fail the waiter and punt it from the queue when it times out. It's okay that we schedule the |
424 | 0 | // timeout before appending it to the waiters, it wont run until the next event loop tick at the |
425 | 0 | // earliest (even if the deadline has already passed). |
426 | 0 | waiter.scheduleTimeout(on: self.eventLoop) { |
427 | 0 | waiter.fail(GRPCConnectionPoolError.deadlineExceeded(connectionError: self._mostRecentError)) |
428 | 0 |
|
429 | 0 | if let index = self.waiters.firstIndex(where: { $0.id == waiter.id }) { |
430 | 0 | self.waiters.remove(at: index) |
431 | 0 |
|
432 | 0 | logger.trace( |
433 | 0 | "timed out waiting for a connection", |
434 | 0 | metadata: [ |
435 | 0 | Metadata.waiterID: "\(waiter.id)", |
436 | 0 | Metadata.waitersCount: .stringConvertible(self.waiters.count), |
437 | 0 | ] |
438 | 0 | ) |
439 | 0 | } |
440 | 0 | } |
441 | 0 |
|
442 | 0 | // request logger |
443 | 0 | logger.debug( |
444 | 0 | "waiting for a connection to become available", |
445 | 0 | metadata: [ |
446 | 0 | Metadata.waiterID: "\(waiter.id)", |
447 | 0 | Metadata.waitersCount: .stringConvertible(self.waiters.count), |
448 | 0 | ] |
449 | 0 | ) |
450 | 0 |
|
451 | 0 | self.waiters.append(waiter) |
452 | 0 |
|
453 | 0 | // pool logger |
454 | 0 | self.logger.trace( |
455 | 0 | "enqueued connection waiter", |
456 | 0 | metadata: [ |
457 | 0 | Metadata.waitersCount: .stringConvertible(self.waiters.count) |
458 | 0 | ] |
459 | 0 | ) |
460 | 0 |
|
461 | 0 | if self._shouldBringUpAnotherConnection() { |
462 | 0 | self._startConnectingIdleConnection() |
463 | 0 | } |
464 | 0 | } |
465 | | |
466 | | /// Compute the current demand and capacity for streams. |
467 | | /// |
468 | | /// The 'demand' for streams is the number of reserved streams and the number of waiters. The |
469 | | /// capacity for streams is the product of max concurrent streams and the number of non-idle |
470 | | /// connections. |
471 | | /// |
472 | | /// - Returns: A tuple of the demand and capacity for streams. |
473 | | @usableFromInline |
474 | 0 | internal func _computeStreamDemandAndCapacity() -> (demand: Int, capacity: Int) { |
475 | 0 | let demand = self.sync.reservedStreams + self.sync.waiters |
476 | 0 |
|
477 | 0 | // TODO: make this cheaper by storing and incrementally updating the number of idle connections |
478 | 0 | let capacity = self._connections.values.reduce(0) { sum, state in |
479 | 0 | if state.manager.sync.isIdle || state.isQuiescing { |
480 | 0 | // Idle connection or quiescing (so the capacity should be ignored). |
481 | 0 | return sum |
482 | 0 | } else if let knownMaxAvailableStreams = state.maxAvailableStreams { |
483 | 0 | // A known value of max concurrent streams, i.e. the connection is active. |
484 | 0 | return sum + knownMaxAvailableStreams |
485 | 0 | } else { |
486 | 0 | // Not idle and no known value, the connection must be connecting so use our assumed value. |
487 | 0 | return sum + self.assumedMaxConcurrentStreams |
488 | 0 | } |
489 | 0 | } |
490 | 0 |
|
491 | 0 | return (demand, capacity) |
492 | 0 | } |
493 | | |
494 | | /// Returns whether the pool should start connecting an idle connection (if one exists). |
495 | | @usableFromInline |
496 | 0 | internal func _shouldBringUpAnotherConnection() -> Bool { |
497 | 0 | let (demand, capacity) = self._computeStreamDemandAndCapacity() |
498 | 0 |
|
499 | 0 | // Infinite -- i.e. all connections are idle or no connections exist -- is okay here as it |
500 | 0 | // will always be greater than the threshold and a new connection will be spun up. |
501 | 0 | let load = Double(demand) / Double(capacity) |
502 | 0 | let loadExceedsThreshold = load >= self.reservationLoadThreshold |
503 | 0 |
|
504 | 0 | if loadExceedsThreshold { |
505 | 0 | self.logger.debug( |
506 | 0 | "stream reservation load factor greater than or equal to threshold, bringing up additional connection if available", |
507 | 0 | metadata: [ |
508 | 0 | Metadata.reservationsCount: .stringConvertible(demand), |
509 | 0 | Metadata.reservationsCapacity: .stringConvertible(capacity), |
510 | 0 | Metadata.reservationsLoad: .stringConvertible(load), |
511 | 0 | Metadata.reservationsLoadThreshold: .stringConvertible(self.reservationLoadThreshold), |
512 | 0 | ] |
513 | 0 | ) |
514 | 0 | } |
515 | 0 |
|
516 | 0 | return loadExceedsThreshold |
517 | 0 | } |
518 | | |
519 | | /// Starts connecting an idle connection, if one exists. |
520 | | @usableFromInline |
521 | 0 | internal func _startConnectingIdleConnection() { |
522 | 0 | if let index = self._connections.values.firstIndex(where: { $0.manager.sync.isIdle }) { |
523 | 0 | self._connections.values[index].manager.sync.startConnecting() |
524 | 0 | } else { |
525 | 0 | let connecting = self._connections.values.count { $0.manager.sync.isConnecting } |
526 | 0 | let ready = self._connections.values.count { $0.manager.sync.isReady } |
527 | 0 | let transientFailure = self._connections.values.count { $0.manager.sync.isTransientFailure } |
528 | 0 |
|
529 | 0 | self.logger.debug( |
530 | 0 | "no idle connections in pool", |
531 | 0 | metadata: [ |
532 | 0 | Metadata.connectionsConnecting: .stringConvertible(connecting), |
533 | 0 | Metadata.connectionsReady: .stringConvertible(ready), |
534 | 0 | Metadata.connectionsTransientFailure: .stringConvertible(transientFailure), |
535 | 0 | Metadata.waitersCount: .stringConvertible(self.waiters.count), |
536 | 0 | ] |
537 | 0 | ) |
538 | 0 | } |
539 | 0 | } |
540 | | |
541 | | /// Returns the index in `self.connections.values` of the connection with the most available |
542 | | /// streams. Returns `self.connections.endIndex` if no connection has at least one stream |
543 | | /// available. |
544 | | /// |
545 | | /// - Note: this is linear in the number of connections. |
546 | | @usableFromInline |
547 | | internal func _mostAvailableConnectionIndex() |
548 | | -> Dictionary<ConnectionManagerID, PerConnectionState>.Index |
549 | 0 | { |
550 | 0 | var index = self._connections.values.startIndex |
551 | 0 | var selectedIndex = self._connections.values.endIndex |
552 | 0 | var mostAvailableStreams = 0 |
553 | 0 |
|
554 | 0 | while index != self._connections.values.endIndex { |
555 | 0 | let availableStreams = self._connections.values[index].availableStreams |
556 | 0 | if availableStreams > mostAvailableStreams { |
557 | 0 | mostAvailableStreams = availableStreams |
558 | 0 | selectedIndex = index |
559 | 0 | } |
560 | 0 |
|
561 | 0 | self._connections.values.formIndex(after: &index) |
562 | 0 | } |
563 | 0 |
|
564 | 0 | return selectedIndex |
565 | 0 | } |
566 | | |
567 | | /// Reserves a stream from the connection with the most available streams, if one exists. |
568 | | /// |
569 | | /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from, |
570 | | /// or `nil` if no stream could be reserved. |
571 | | @usableFromInline |
572 | 0 | internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? { |
573 | 0 | let index = self._mostAvailableConnectionIndex() |
574 | 0 |
|
575 | 0 | if index != self._connections.endIndex { |
576 | 0 | // '!' is okay here; the most available connection must have at least one stream available |
577 | 0 | // to reserve. |
578 | 0 | return self._connections.values[index].reserveStream()! |
579 | 0 | } else { |
580 | 0 | return nil |
581 | 0 | } |
582 | 0 | } |
583 | | |
584 | | /// See `shutdown(mode:)`. |
585 | | /// |
586 | | /// - Parameter promise: A `promise` to complete when the pool has been shutdown. |
587 | | @usableFromInline |
588 | 0 | internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) { |
589 | 0 | self.eventLoop.assertInEventLoop() |
590 | 0 |
|
591 | 0 | switch self._state { |
592 | 0 | case .active: |
593 | 0 | self.logger.debug("shutting down connection pool") |
594 | 0 |
|
595 | 0 | // We're shutting down now and when that's done we'll be fully shutdown. |
596 | 0 | self._state = .shuttingDown(promise.futureResult) |
597 | 0 | promise.futureResult.whenComplete { _ in |
598 | 0 | self._state = .shutdown |
599 | 0 | self.delegate = nil |
600 | 0 | self.logger.trace("finished shutting down connection pool") |
601 | 0 | } |
602 | 0 |
|
603 | 0 | // Shutdown all the connections and remove them from the pool. |
604 | 0 | let connections = self._connections |
605 | 0 | self._connections.removeAll() |
606 | 0 |
|
607 | 0 | let allShutdown: [EventLoopFuture<Void>] = connections.values.map { |
608 | 0 | let id = $0.manager.id |
609 | 0 | let manager = $0.manager |
610 | 0 |
|
611 | 0 | return manager.eventLoop.flatSubmit { |
612 | 0 | // If the connection was idle/shutdown before calling shutdown then we shouldn't tell |
613 | 0 | // the delegate the connection closed (because it either never connected or was already |
614 | 0 | // informed about this). |
615 | 0 | let connectionIsInactive = manager.sync.isIdle || manager.sync.isShutdown |
616 | 0 | return manager.shutdown(mode: mode).always { _ in |
617 | 0 | if !connectionIsInactive { |
618 | 0 | self.delegate?.connectionClosed(id: .init(id), error: nil) |
619 | 0 | } |
620 | 0 | self.delegate?.connectionRemoved(id: .init(id)) |
621 | 0 | } |
622 | 0 | } |
623 | 0 | } |
624 | 0 |
|
625 | 0 | // Fail the outstanding waiters. |
626 | 0 | while let waiter = self.waiters.popFirst() { |
627 | 0 | waiter.fail(GRPCConnectionPoolError.shutdown) |
628 | 0 | } |
629 | 0 |
|
630 | 0 | // Cascade the result of the shutdown into the promise. |
631 | 0 | EventLoopFuture.andAllSucceed(allShutdown, promise: promise) |
632 | 0 |
|
633 | 0 | case let .shuttingDown(future): |
634 | 0 | // We're already shutting down, cascade the result. |
635 | 0 | promise.completeWith(future) |
636 | 0 |
|
637 | 0 | case .shutdown: |
638 | 0 | // Already shutdown, fine. |
639 | 0 | promise.succeed(()) |
640 | 0 | } |
641 | 0 | } |
642 | | |
643 | 0 | internal func stats() -> EventLoopFuture<GRPCSubPoolStats> { |
644 | 0 | let promise = self.eventLoop.makePromise(of: GRPCSubPoolStats.self) |
645 | 0 |
|
646 | 0 | if self.eventLoop.inEventLoop { |
647 | 0 | self._stats(promise: promise) |
648 | 0 | } else { |
649 | 0 | self.eventLoop.execute { |
650 | 0 | self._stats(promise: promise) |
651 | 0 | } |
652 | 0 | } |
653 | 0 |
|
654 | 0 | return promise.futureResult |
655 | 0 | } |
656 | | |
657 | 0 | private func _stats(promise: EventLoopPromise<GRPCSubPoolStats>) { |
658 | 0 | self.eventLoop.assertInEventLoop() |
659 | 0 |
|
660 | 0 | var stats = GRPCSubPoolStats(id: self.id) |
661 | 0 |
|
662 | 0 | for connection in self._connections.values { |
663 | 0 | let sync = connection.manager.sync |
664 | 0 | if sync.isIdle { |
665 | 0 | stats.connectionStates.idle += 1 |
666 | 0 | } else if sync.isConnecting { |
667 | 0 | stats.connectionStates.connecting += 1 |
668 | 0 | } else if sync.isReady { |
669 | 0 | stats.connectionStates.ready += 1 |
670 | 0 | } else if sync.isTransientFailure { |
671 | 0 | stats.connectionStates.transientFailure += 1 |
672 | 0 | } |
673 | 0 |
|
674 | 0 | stats.streamsInUse += connection.reservedStreams |
675 | 0 | stats.streamsFreeToUse += connection.availableStreams |
676 | 0 | } |
677 | 0 |
|
678 | 0 | stats.rpcsWaiting += self.waiters.count |
679 | 0 |
|
680 | 0 | promise.succeed(stats) |
681 | 0 | } |
682 | | } |
683 | | |
684 | | extension ConnectionPool: ConnectionManagerConnectivityDelegate { |
685 | | // We're interested in a few different situations here: |
686 | | // |
687 | | // 1. The connection was usable ('ready') and is no longer usable (either it became idle or |
688 | | // encountered an error. If this happens we need to notify any connections of the change as |
689 | | // they may no longer be used for new RPCs. |
690 | | // 2. The connection was not usable but moved to a different unusable state. If this happens and |
691 | | // we know the cause of the state transition (i.e. the error) then we need to update our most |
692 | | // recent error with the error. This information is used when failing waiters to provide some |
693 | | // context as to why they may be failing. |
694 | | func connectionStateDidChange( |
695 | | _ manager: ConnectionManager, |
696 | | from oldState: _ConnectivityState, |
697 | | to newState: _ConnectivityState |
698 | 0 | ) { |
699 | 0 | switch (oldState, newState) { |
700 | 0 | case let (.ready, .transientFailure(error)), |
701 | 0 | let (.ready, .idle(.some(error))): |
702 | 0 | self.updateMostRecentError(error) |
703 | 0 | self.connectionUnavailable(manager.id) |
704 | 0 |
|
705 | 0 | case (.ready, .idle(.none)), |
706 | 0 | (.ready, .shutdown): |
707 | 0 | self.connectionUnavailable(manager.id) |
708 | 0 |
|
709 | 0 | case let (_, .transientFailure(error)), |
710 | 0 | let (_, .idle(.some(error))): |
711 | 0 | self.updateMostRecentError(error) |
712 | 0 |
|
713 | 0 | default: |
714 | 0 | () |
715 | 0 | } |
716 | 0 |
|
717 | 0 | guard let delegate = self.delegate else { return } |
718 | 0 |
|
719 | 0 | switch (oldState, newState) { |
720 | 0 | case (.idle, .connecting), |
721 | 0 | (.transientFailure, .connecting): |
722 | 0 | delegate.startedConnecting(id: .init(manager.id)) |
723 | 0 |
|
724 | 0 | case (.connecting, .ready): |
725 | 0 | // The connection becoming ready is handled by 'receivedSettingsMaxConcurrentStreams'. |
726 | 0 | () |
727 | 0 |
|
728 | 0 | case (.ready, .idle): |
729 | 0 | delegate.connectionClosed(id: .init(manager.id), error: nil) |
730 | 0 |
|
731 | 0 | case let (.ready, .transientFailure(error)): |
732 | 0 | delegate.connectionClosed(id: .init(manager.id), error: error) |
733 | 0 |
|
734 | 0 | case let (.connecting, .transientFailure(error)): |
735 | 0 | delegate.connectFailed(id: .init(manager.id), error: error) |
736 | 0 |
|
737 | 0 | default: |
738 | 0 | () |
739 | 0 | } |
740 | 0 | } |
741 | | |
742 | 0 | func connectionIsQuiescing(_ manager: ConnectionManager) { |
743 | 0 | self.eventLoop.assertInEventLoop() |
744 | 0 |
|
745 | 0 | // Find the relevant connection. |
746 | 0 | guard let index = self._connections.index(forKey: manager.id) else { |
747 | 0 | return |
748 | 0 | } |
749 | 0 |
|
750 | 0 | // Drop the connectivity delegate, we're no longer interested in its events now. |
751 | 0 | manager.sync.connectivityDelegate = nil |
752 | 0 |
|
753 | 0 | // Started quiescing; update our state and notify the pool delegate. |
754 | 0 | self._connections.values[index].isQuiescing = true |
755 | 0 | self.delegate?.connectionQuiescing(id: .init(manager.id)) |
756 | 0 |
|
757 | 0 | // As the connection is quescing, we need to know when the current connection its managing has |
758 | 0 | // closed. When that happens drop the H2 delegate and update the pool delegate. |
759 | 0 | manager.onCurrentConnectionClose { hadActiveConnection in |
760 | 0 | assert(hadActiveConnection) |
761 | 0 | if let removed = self._connections.removeValue(forKey: manager.id) { |
762 | 0 | removed.manager.sync.http2Delegate = nil |
763 | 0 | removed.manager.sync.shutdownNow() // Manager may have internal state to tear down. |
764 | 0 | self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil) |
765 | 0 | self.delegate?.connectionRemoved(id: .init(removed.manager.id)) |
766 | 0 | } |
767 | 0 | } |
768 | 0 |
|
769 | 0 | // Grab the number of reserved streams (before invalidating the index by adding a connection). |
770 | 0 | let reservedStreams = self._connections.values[index].reservedStreams |
771 | 0 |
|
772 | 0 | // Replace the connection with a new idle one. Keep the idle behavior, so that |
773 | 0 | // if it's a connection that should be kept alive, we maintain it. |
774 | 0 | self.addConnectionToPool(idleBehavior: manager.idleBehavior) |
775 | 0 |
|
776 | 0 | // Since we're removing this connection from the pool (and no new streams can be created on |
777 | 0 | // the connection), the pool manager can ignore any streams reserved against this connection. |
778 | 0 | // We do still care about the number of reserved streams for the connection though |
779 | 0 | // |
780 | 0 | // Note: we don't need to adjust the number of available streams as the effective number of |
781 | 0 | // connections hasn't changed. |
782 | 0 | self.streamLender.returnStreams(reservedStreams, to: self) |
783 | 0 | } |
784 | | |
785 | 0 | private func updateMostRecentError(_ error: Error) { |
786 | 0 | self.eventLoop.assertInEventLoop() |
787 | 0 | // Update the last known error if there is one. We will use it to provide some context to |
788 | 0 | // waiters which may fail. |
789 | 0 | self._mostRecentError = error |
790 | 0 | } |
791 | | |
792 | | /// A connection has become unavailable. |
793 | 0 | private func connectionUnavailable(_ id: ConnectionManagerID) { |
794 | 0 | self.eventLoop.assertInEventLoop() |
795 | 0 | // The connection is no longer available: any streams which haven't been closed will be counted |
796 | 0 | // as a dropped reservation, we need to tell the pool manager about them. |
797 | 0 | if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 { |
798 | 0 | self.streamLender.returnStreams(droppedReservations, to: self) |
799 | 0 | } |
800 | 0 | } |
801 | | } |
802 | | |
803 | | extension ConnectionPool: ConnectionManagerHTTP2Delegate { |
804 | 0 | internal func streamOpened(_ manager: ConnectionManager) { |
805 | 0 | self.eventLoop.assertInEventLoop() |
806 | 0 | if let utilization = self._connections[manager.id]?.openedStream(), |
807 | 0 | let delegate = self.delegate |
808 | 0 | { |
809 | 0 | delegate.connectionUtilizationChanged( |
810 | 0 | id: .init(manager.id), |
811 | 0 | streamsUsed: utilization.used, |
812 | 0 | streamCapacity: utilization.capacity |
813 | 0 | ) |
814 | 0 | } |
815 | 0 | } |
816 | | |
817 | 0 | internal func streamClosed(_ manager: ConnectionManager) { |
818 | 0 | self.eventLoop.assertInEventLoop() |
819 | 0 |
|
820 | 0 | guard let index = self._connections.index(forKey: manager.id) else { |
821 | 0 | return |
822 | 0 | } |
823 | 0 |
|
824 | 0 | // Return the stream the connection and to the pool manager. |
825 | 0 | if let utilization = self._connections.values[index].returnStream(), |
826 | 0 | let delegate = self.delegate |
827 | 0 | { |
828 | 0 | delegate.connectionUtilizationChanged( |
829 | 0 | id: .init(manager.id), |
830 | 0 | streamsUsed: utilization.used, |
831 | 0 | streamCapacity: utilization.capacity |
832 | 0 | ) |
833 | 0 | } |
834 | 0 |
|
835 | 0 | // Return the stream to the pool manager if the connection is available and not quiescing. For |
836 | 0 | // quiescing connections streams were returned when the connection started quiescing. |
837 | 0 | if self._connections.values[index].isAvailable, !self._connections.values[index].isQuiescing { |
838 | 0 | self.streamLender.returnStreams(1, to: self) |
839 | 0 |
|
840 | 0 | // A stream was returned: we may be able to service a waiter now. |
841 | 0 | self.tryServiceWaiters() |
842 | 0 | } |
843 | 0 | } |
844 | | |
845 | | internal func receivedSettingsMaxConcurrentStreams( |
846 | | _ manager: ConnectionManager, |
847 | | maxConcurrentStreams: Int |
848 | 0 | ) { |
849 | 0 | self.eventLoop.assertInEventLoop() |
850 | 0 |
|
851 | 0 | // Find the relevant connection. |
852 | 0 | guard let index = self._connections.index(forKey: manager.id) else { |
853 | 0 | return |
854 | 0 | } |
855 | 0 |
|
856 | 0 | // When the connection is quiescing, the pool manager is not interested in updates to the |
857 | 0 | // connection, bail out early. |
858 | 0 | if self._connections.values[index].isQuiescing { |
859 | 0 | return |
860 | 0 | } |
861 | 0 |
|
862 | 0 | // If we received a SETTINGS update then a connection is okay: drop the last known error. |
863 | 0 | self._mostRecentError = nil |
864 | 0 |
|
865 | 0 | let previous = self._connections.values[index].updateMaxConcurrentStreams(maxConcurrentStreams) |
866 | 0 | let delta: Int |
867 | 0 |
|
868 | 0 | if let previousValue = previous { |
869 | 0 | // There was a previous value of max concurrent streams, i.e. a change in value for an |
870 | 0 | // existing connection. |
871 | 0 | delta = maxConcurrentStreams - previousValue |
872 | 0 | } else { |
873 | 0 | // There was no previous value so this must be a new connection. We'll compare against our |
874 | 0 | // assumed default. |
875 | 0 | delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams |
876 | 0 | // Notify the delegate. |
877 | 0 | self.delegate?.connectSucceeded(id: .init(manager.id), streamCapacity: maxConcurrentStreams) |
878 | 0 | } |
879 | 0 |
|
880 | 0 | if delta != 0 { |
881 | 0 | self.streamLender.changeStreamCapacity(by: delta, for: self) |
882 | 0 | } |
883 | 0 |
|
884 | 0 | // We always check, even if `delta` isn't greater than zero as this might be a new connection. |
885 | 0 | self.tryServiceWaiters() |
886 | 0 | } |
887 | | } |
888 | | |
889 | | extension ConnectionPool { |
890 | | // MARK: - Waiters |
891 | | |
892 | | /// Try to service as many waiters as possible. |
893 | | /// |
894 | | /// This an expensive operation, in the worst case it will be `O(W ⨉ N)` where `W` is the number |
895 | | /// of waiters and `N` is the number of connections. |
896 | 0 | private func tryServiceWaiters() { |
897 | 0 | if self.waiters.isEmpty { return } |
898 | 0 |
|
899 | 0 | self.logger.trace( |
900 | 0 | "servicing waiters", |
901 | 0 | metadata: [ |
902 | 0 | Metadata.waitersCount: .stringConvertible(self.waiters.count) |
903 | 0 | ] |
904 | 0 | ) |
905 | 0 |
|
906 | 0 | let now = self.now() |
907 | 0 | var serviced = 0 |
908 | 0 |
|
909 | 0 | while !self.waiters.isEmpty { |
910 | 0 | if self.waiters.first!.deadlineIsAfter(now) { |
911 | 0 | if let multiplexer = self._reserveStreamFromMostAvailableConnection() { |
912 | 0 | // The waiter's deadline is in the future, and we have a suitable connection. Remove and |
913 | 0 | // succeed the waiter. |
914 | 0 | let waiter = self.waiters.removeFirst() |
915 | 0 | serviced &+= 1 |
916 | 0 | waiter.succeed(with: multiplexer) |
917 | 0 | } else { |
918 | 0 | // There are waiters but no available connections, we're done. |
919 | 0 | break |
920 | 0 | } |
921 | 0 | } else { |
922 | 0 | // The waiter's deadline has already expired, there's no point completing it. Remove it and |
923 | 0 | // let its scheduled timeout fail the promise. |
924 | 0 | self.waiters.removeFirst() |
925 | 0 | } |
926 | 0 | } |
927 | 0 |
|
928 | 0 | self.logger.trace( |
929 | 0 | "done servicing waiters", |
930 | 0 | metadata: [ |
931 | 0 | Metadata.waitersCount: .stringConvertible(self.waiters.count), |
932 | 0 | Metadata.waitersServiced: .stringConvertible(serviced), |
933 | 0 | ] |
934 | 0 | ) |
935 | 0 | } |
936 | | } |
937 | | |
938 | | extension ConnectionPool { |
939 | | /// Synchronous operations for the pool, mostly used by tests. |
940 | | internal struct Sync { |
941 | | private let pool: ConnectionPool |
942 | | |
943 | 0 | fileprivate init(_ pool: ConnectionPool) { |
944 | 0 | self.pool = pool |
945 | 0 | } |
946 | | |
947 | | /// The number of outstanding connection waiters. |
948 | 0 | internal var waiters: Int { |
949 | 0 | self.pool.eventLoop.assertInEventLoop() |
950 | 0 | return self.pool.waiters.count |
951 | 0 | } |
952 | | |
953 | | /// The number of connection currently in the pool (in any state). |
954 | 0 | internal var connections: Int { |
955 | 0 | self.pool.eventLoop.assertInEventLoop() |
956 | 0 | return self.pool._connections.count |
957 | 0 | } |
958 | | |
959 | | /// The number of idle connections in the pool. |
960 | 0 | internal var idleConnections: Int { |
961 | 0 | self.pool.eventLoop.assertInEventLoop() |
962 | 0 | return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) } |
963 | 0 | } |
964 | | |
965 | | /// The number of active (i.e. connecting or ready) connections in the pool. |
966 | 0 | internal var activeConnections: Int { |
967 | 0 | self.pool.eventLoop.assertInEventLoop() |
968 | 0 | return self.pool._connections.values.reduce(0) { |
969 | 0 | $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0) |
970 | 0 | } |
971 | 0 | } |
972 | | |
973 | | /// The number of connections in the pool in transient failure state. |
974 | 0 | internal var transientFailureConnections: Int { |
975 | 0 | self.pool.eventLoop.assertInEventLoop() |
976 | 0 | return self.pool._connections.values.reduce(0) { |
977 | 0 | $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0) |
978 | 0 | } |
979 | 0 | } |
980 | | |
981 | | /// The number of streams currently available to reserve across all connections in the pool. |
982 | 0 | internal var availableStreams: Int { |
983 | 0 | self.pool.eventLoop.assertInEventLoop() |
984 | 0 | return self.pool._connections.values.reduce(0) { $0 + $1.availableStreams } |
985 | 0 | } |
986 | | |
987 | | /// The number of streams which have been reserved across all connections in the pool. |
988 | 0 | internal var reservedStreams: Int { |
989 | 0 | self.pool.eventLoop.assertInEventLoop() |
990 | 0 | return self.pool._connections.values.reduce(0) { $0 + $1.reservedStreams } |
991 | 0 | } |
992 | | |
993 | | /// Updates the most recent connection error. |
994 | 0 | internal func updateMostRecentError(_ error: Error) { |
995 | 0 | self.pool.eventLoop.assertInEventLoop() |
996 | 0 | self.pool.updateMostRecentError(error) |
997 | 0 | } |
998 | | } |
999 | | |
1000 | 0 | internal var sync: Sync { |
1001 | 0 | return Sync(self) |
1002 | 0 | } |
1003 | | } |
1004 | | |
1005 | | /// An error thrown from the ``GRPCChannelPool``. |
1006 | | public struct GRPCConnectionPoolError: Error, CustomStringConvertible { |
1007 | | public struct Code: Hashable, Sendable, CustomStringConvertible { |
1008 | | enum Code { |
1009 | | case shutdown |
1010 | | case tooManyWaiters |
1011 | | case deadlineExceeded |
1012 | | } |
1013 | | |
1014 | | fileprivate var code: Code |
1015 | | |
1016 | 0 | private init(_ code: Code) { |
1017 | 0 | self.code = code |
1018 | 0 | } |
1019 | | |
1020 | 0 | public var description: String { |
1021 | 0 | String(describing: self.code) |
1022 | 0 | } |
1023 | | |
1024 | | /// The pool is shutdown or shutting down. |
1025 | 0 | public static var shutdown: Self { Self(.shutdown) } |
1026 | | |
1027 | | /// There are too many waiters in the pool. |
1028 | 0 | public static var tooManyWaiters: Self { Self(.tooManyWaiters) } |
1029 | | |
1030 | | /// The deadline for creating a stream has passed. |
1031 | 0 | public static var deadlineExceeded: Self { Self(.deadlineExceeded) } |
1032 | | } |
1033 | | |
1034 | | /// The error code. |
1035 | | public var code: Code |
1036 | | |
1037 | | /// An underlying error which caused this error to be thrown. |
1038 | | public var underlyingError: Error? |
1039 | | |
1040 | 0 | public var description: String { |
1041 | 0 | if let underlyingError = self.underlyingError { |
1042 | 0 | return "\(self.code) (\(underlyingError))" |
1043 | 0 | } else { |
1044 | 0 | return String(describing: self.code) |
1045 | 0 | } |
1046 | 0 | } |
1047 | | |
1048 | | /// Create a new connection pool error with the given code and underlying error. |
1049 | | /// |
1050 | | /// - Parameters: |
1051 | | /// - code: The error code. |
1052 | | /// - underlyingError: The underlying error which led to this error being thrown. |
1053 | 0 | public init(code: Code, underlyingError: Error? = nil) { |
1054 | 0 | self.code = code |
1055 | 0 | self.underlyingError = underlyingError |
1056 | 0 | } |
1057 | | } |
1058 | | |
1059 | | extension GRPCConnectionPoolError { |
1060 | | @usableFromInline |
1061 | | static let shutdown = Self(code: .shutdown) |
1062 | | |
1063 | | @inlinable |
1064 | 0 | static func tooManyWaiters(connectionError: Error?) -> Self { |
1065 | 0 | Self(code: .tooManyWaiters, underlyingError: connectionError) |
1066 | 0 | } |
1067 | | |
1068 | | @inlinable |
1069 | 0 | static func deadlineExceeded(connectionError: Error?) -> Self { |
1070 | 0 | Self(code: .deadlineExceeded, underlyingError: connectionError) |
1071 | 0 | } |
1072 | | } |
1073 | | |
1074 | | extension GRPCConnectionPoolError: GRPCStatusTransformable { |
1075 | 0 | public func makeGRPCStatus() -> GRPCStatus { |
1076 | 0 | switch self.code.code { |
1077 | 0 | case .shutdown: |
1078 | 0 | return GRPCStatus( |
1079 | 0 | code: .unavailable, |
1080 | 0 | message: "The connection pool is shutdown", |
1081 | 0 | cause: self.underlyingError |
1082 | 0 | ) |
1083 | 0 |
|
1084 | 0 | case .tooManyWaiters: |
1085 | 0 | return GRPCStatus( |
1086 | 0 | code: .resourceExhausted, |
1087 | 0 | message: "The connection pool has no capacity for new RPCs or RPC waiters", |
1088 | 0 | cause: self.underlyingError |
1089 | 0 | ) |
1090 | 0 |
|
1091 | 0 | case .deadlineExceeded: |
1092 | 0 | return GRPCStatus( |
1093 | 0 | code: .deadlineExceeded, |
1094 | 0 | message: "Timed out waiting for an HTTP/2 stream from the connection pool", |
1095 | 0 | cause: self.underlyingError |
1096 | 0 | ) |
1097 | 0 | } |
1098 | 0 | } |
1099 | | } |
1100 | | |
1101 | | extension Sequence { |
1102 | 0 | fileprivate func count(where predicate: (Element) -> Bool) -> Int { |
1103 | 0 | return self.reduce(0) { count, element in |
1104 | 0 | predicate(element) ? count + 1 : count |
1105 | 0 | } |
1106 | 0 | } |
1107 | | } |