Coverage Report

Created: 2026-03-11 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/ConnectionPool/PooledChannel.swift
Line
Count
Source
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHTTP2
19
import SwiftProtobuf
20
21
#if canImport(NIOSSL)
22
import NIOSSL
23
#endif
24
25
@usableFromInline
26
internal final class PooledChannel: GRPCChannel {
27
  @usableFromInline
28
  internal let _configuration: GRPCChannelPool.Configuration
29
  @usableFromInline
30
  internal let _pool: PoolManager
31
  @usableFromInline
32
  internal let _authority: String
33
  @usableFromInline
34
  internal let _scheme: String
35
36
  @inlinable
37
0
  internal init(configuration: GRPCChannelPool.Configuration) throws {
38
0
    self._configuration = configuration
39
0
    self._authority = configuration.target.host
40
0
41
0
    let tlsMode: DefaultChannelProvider.TLSMode
42
0
    let scheme: String
43
0
44
0
    if let tlsConfiguration = configuration.transportSecurity.tlsConfiguration {
45
0
      scheme = "https"
46
      #if canImport(NIOSSL)
47
0
      if let sslContext = try tlsConfiguration.makeNIOSSLContext() {
48
0
        tlsMode = .configureWithNIOSSL(.success(sslContext))
49
0
      } else {
50
        #if canImport(Network)
51
        // - TLS is configured
52
        // - NIOSSL is available but we aren't using it
53
        // - Network.framework is available, we MUST be using that.
54
        tlsMode = .configureWithNetworkFramework
55
        #else
56
0
        // - TLS is configured
57
0
        // - NIOSSL is available but we aren't using it
58
0
        // - Network.framework is not available
59
0
        // NIOSSL or Network.framework must be available as TLS is configured.
60
0
        fatalError()
61
        #endif
62
0
      }
63
      #elseif canImport(Network)
64
      // - TLS is configured
65
      // - NIOSSL is not available
66
      // - Network.framework is available, we MUST be using that.
67
      tlsMode = .configureWithNetworkFramework
68
      #else
69
      // - TLS is configured
70
      // - NIOSSL is not available
71
      // - Network.framework is not available
72
      // NIOSSL or Network.framework must be available as TLS is configured.
73
      fatalError()
74
      #endif  // canImport(NIOSSL)
75
0
    } else {
76
0
      scheme = "http"
77
0
      tlsMode = .disabled
78
0
    }
79
0
80
0
    self._scheme = scheme
81
0
82
0
    let provider: DefaultChannelProvider
83
    #if canImport(Network)
84
    if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *) {
85
      provider = DefaultChannelProvider(
86
        connectionTarget: configuration.target,
87
        connectionKeepalive: configuration.keepalive,
88
        connectionIdleTimeout: configuration.idleTimeout,
89
        connectionMaxAge: configuration.maxConnectionAge,
90
        tlsMode: tlsMode,
91
        tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
92
        httpTargetWindowSize: configuration.http2.targetWindowSize,
93
        httpMaxFrameSize: configuration.http2.maxFrameSize,
94
        httpMaxResetStreams: configuration.http2.maxResetStreams,
95
        errorDelegate: configuration.errorDelegate,
96
        debugChannelInitializer: configuration.debugChannelInitializer,
97
        nwParametersConfigurator: configuration.transportServices.nwParametersConfigurator
98
      )
99
    } else {
100
      provider = DefaultChannelProvider(
101
        connectionTarget: configuration.target,
102
        connectionKeepalive: configuration.keepalive,
103
        connectionIdleTimeout: configuration.idleTimeout,
104
        connectionMaxAge: configuration.maxConnectionAge,
105
        tlsMode: tlsMode,
106
        tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
107
        httpTargetWindowSize: configuration.http2.targetWindowSize,
108
        httpMaxFrameSize: configuration.http2.maxFrameSize,
109
        httpMaxResetStreams: configuration.http2.maxResetStreams,
110
        errorDelegate: configuration.errorDelegate,
111
        debugChannelInitializer: configuration.debugChannelInitializer
112
      )
113
    }
114
    #else
115
0
    provider = DefaultChannelProvider(
116
0
      connectionTarget: configuration.target,
117
0
      connectionKeepalive: configuration.keepalive,
118
0
      connectionIdleTimeout: configuration.idleTimeout,
119
0
      connectionMaxAge: configuration.maxConnectionAge,
120
0
      tlsMode: tlsMode,
121
0
      tlsConfiguration: configuration.transportSecurity.tlsConfiguration,
122
0
      httpTargetWindowSize: configuration.http2.targetWindowSize,
123
0
      httpMaxFrameSize: configuration.http2.maxFrameSize,
124
0
      httpMaxResetStreams: configuration.http2.maxResetStreams,
125
0
      errorDelegate: configuration.errorDelegate,
126
0
      debugChannelInitializer: configuration.debugChannelInitializer
127
0
    )
128
    #endif
129
0
130
0
    self._pool = PoolManager.makeInitializedPoolManager(
131
0
      using: configuration.eventLoopGroup,
132
0
      perPoolConfiguration: .init(
133
0
        maxConnections: configuration.connectionPool.connectionsPerEventLoop,
134
0
        maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
135
0
        minConnections: configuration.connectionPool.minConnectionsPerEventLoop,
136
0
        loadThreshold: configuration.connectionPool.reservationLoadThreshold,
137
0
        assumedMaxConcurrentStreams: 100,
138
0
        connectionBackoff: configuration.connectionBackoff,
139
0
        channelProvider: provider,
140
0
        delegate: configuration.delegate,
141
0
        statsPeriod: configuration.statsPeriod
142
0
      ),
143
0
      logger: configuration.backgroundActivityLogger
144
0
    )
145
0
  }
146
147
  @inlinable
148
  internal func _makeStreamChannel(
149
    callOptions: CallOptions
150
0
  ) -> (EventLoopFuture<Channel>, EventLoop) {
151
0
    let preferredEventLoop = callOptions.eventLoopPreference.exact
152
0
    let connectionWaitDeadline = NIODeadline.now() + self._configuration.connectionPool.maxWaitTime
153
0
    let deadline = min(callOptions.timeLimit.makeDeadline(), connectionWaitDeadline)
154
0
155
0
    let streamChannel = self._pool.makeStream(
156
0
      preferredEventLoop: preferredEventLoop,
157
0
      deadline: deadline,
158
0
      logger: callOptions.logger
159
0
    ) { channel in
160
0
      return channel.eventLoop.makeSucceededVoidFuture()
161
0
    }
162
0
163
0
    return (streamChannel.futureResult, preferredEventLoop ?? streamChannel.eventLoop)
164
0
  }
165
166
  // MARK: GRPCChannel conformance
167
168
  @inlinable
169
  internal func makeCall<Request, Response>(
170
    path: String,
171
    type: GRPCCallType,
172
    callOptions: CallOptions,
173
    interceptors: [ClientInterceptor<Request, Response>]
174
0
  ) -> Call<Request, Response> where Request: Message, Response: Message {
175
0
    var callOptions = callOptions
176
0
    if let requestID = callOptions.requestIDProvider.requestID() {
177
0
      callOptions.applyRequestID(requestID)
178
0
    }
179
0
180
0
    let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)
181
0
182
0
    return Call(
183
0
      path: path,
184
0
      type: type,
185
0
      eventLoop: eventLoop,
186
0
      options: callOptions,
187
0
      interceptors: interceptors,
188
0
      transportFactory: .http2(
189
0
        channel: stream,
190
0
        authority: self._authority,
191
0
        scheme: self._scheme,
192
0
        maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength,
193
0
        errorDelegate: self._configuration.errorDelegate
194
0
      )
195
0
    )
196
0
  }
197
198
  @inlinable
199
  internal func makeCall<Request, Response>(
200
    path: String,
201
    type: GRPCCallType,
202
    callOptions: CallOptions,
203
    interceptors: [ClientInterceptor<Request, Response>]
204
0
  ) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
205
0
    var callOptions = callOptions
206
0
    if let requestID = callOptions.requestIDProvider.requestID() {
207
0
      callOptions.applyRequestID(requestID)
208
0
    }
209
0
210
0
    let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)
211
0
212
0
    return Call(
213
0
      path: path,
214
0
      type: type,
215
0
      eventLoop: eventLoop,
216
0
      options: callOptions,
217
0
      interceptors: interceptors,
218
0
      transportFactory: .http2(
219
0
        channel: stream,
220
0
        authority: self._authority,
221
0
        scheme: self._scheme,
222
0
        maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength,
223
0
        errorDelegate: self._configuration.errorDelegate
224
0
      )
225
0
    )
226
0
  }
227
228
  @inlinable
229
0
  internal func close(promise: EventLoopPromise<Void>) {
230
0
    self._pool.shutdown(mode: .forceful, promise: promise)
231
0
  }
232
233
  @inlinable
234
0
  internal func close() -> EventLoopFuture<Void> {
235
0
    let promise = self._configuration.eventLoopGroup.next().makePromise(of: Void.self)
236
0
    self.close(promise: promise)
237
0
    return promise.futureResult
238
0
  }
239
240
  @usableFromInline
241
0
  internal func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) {
242
0
    self._pool.shutdown(mode: .graceful(deadline), promise: promise)
243
0
  }
244
}
245
246
extension CallOptions {
247
  @usableFromInline
248
0
  mutating func applyRequestID(_ requestID: String) {
249
0
    self.logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
250
0
    // Add the request ID header too.
251
0
    if let requestIDHeader = self.requestIDHeader {
252
0
      self.customMetadata.add(name: requestIDHeader, value: requestID)
253
0
    }
254
0
  }
255
}