/src/grpc-swift/Sources/GRPC/ConnectionPool/PooledChannel.swift
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHTTP2 |
19 | | import SwiftProtobuf |
20 | | |
21 | | #if canImport(NIOSSL) |
22 | | import NIOSSL |
23 | | #endif |
24 | | |
25 | | @usableFromInline |
26 | | internal final class PooledChannel: GRPCChannel { |
27 | | @usableFromInline |
28 | | internal let _configuration: GRPCChannelPool.Configuration |
29 | | @usableFromInline |
30 | | internal let _pool: PoolManager |
31 | | @usableFromInline |
32 | | internal let _authority: String |
33 | | @usableFromInline |
34 | | internal let _scheme: String |
35 | | |
36 | | @inlinable |
37 | 0 | internal init(configuration: GRPCChannelPool.Configuration) throws { |
38 | 0 | self._configuration = configuration |
39 | 0 | self._authority = configuration.target.host |
40 | 0 |
|
41 | 0 | let tlsMode: DefaultChannelProvider.TLSMode |
42 | 0 | let scheme: String |
43 | 0 |
|
44 | 0 | if let tlsConfiguration = configuration.transportSecurity.tlsConfiguration { |
45 | 0 | scheme = "https" |
46 | | #if canImport(NIOSSL) |
47 | 0 | if let sslContext = try tlsConfiguration.makeNIOSSLContext() { |
48 | 0 | tlsMode = .configureWithNIOSSL(.success(sslContext)) |
49 | 0 | } else { |
50 | | #if canImport(Network) |
51 | | // - TLS is configured |
52 | | // - NIOSSL is available but we aren't using it |
53 | | // - Network.framework is available, we MUST be using that. |
54 | | tlsMode = .configureWithNetworkFramework |
55 | | #else |
56 | 0 | // - TLS is configured |
57 | 0 | // - NIOSSL is available but we aren't using it |
58 | 0 | // - Network.framework is not available |
59 | 0 | // NIOSSL or Network.framework must be available as TLS is configured. |
60 | 0 | fatalError() |
61 | | #endif |
62 | 0 | } |
63 | | #elseif canImport(Network) |
64 | | // - TLS is configured |
65 | | // - NIOSSL is not available |
66 | | // - Network.framework is available, we MUST be using that. |
67 | | tlsMode = .configureWithNetworkFramework |
68 | | #else |
69 | | // - TLS is configured |
70 | | // - NIOSSL is not available |
71 | | // - Network.framework is not available |
72 | | // NIOSSL or Network.framework must be available as TLS is configured. |
73 | | fatalError() |
74 | | #endif // canImport(NIOSSL) |
75 | 0 | } else { |
76 | 0 | scheme = "http" |
77 | 0 | tlsMode = .disabled |
78 | 0 | } |
79 | 0 |
|
80 | 0 | self._scheme = scheme |
81 | 0 |
|
82 | 0 | let provider: DefaultChannelProvider |
83 | | #if canImport(Network) |
84 | | if #available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *) { |
85 | | provider = DefaultChannelProvider( |
86 | | connectionTarget: configuration.target, |
87 | | connectionKeepalive: configuration.keepalive, |
88 | | connectionIdleTimeout: configuration.idleTimeout, |
89 | | connectionMaxAge: configuration.maxConnectionAge, |
90 | | tlsMode: tlsMode, |
91 | | tlsConfiguration: configuration.transportSecurity.tlsConfiguration, |
92 | | httpTargetWindowSize: configuration.http2.targetWindowSize, |
93 | | httpMaxFrameSize: configuration.http2.maxFrameSize, |
94 | | httpMaxResetStreams: configuration.http2.maxResetStreams, |
95 | | errorDelegate: configuration.errorDelegate, |
96 | | debugChannelInitializer: configuration.debugChannelInitializer, |
97 | | nwParametersConfigurator: configuration.transportServices.nwParametersConfigurator |
98 | | ) |
99 | | } else { |
100 | | provider = DefaultChannelProvider( |
101 | | connectionTarget: configuration.target, |
102 | | connectionKeepalive: configuration.keepalive, |
103 | | connectionIdleTimeout: configuration.idleTimeout, |
104 | | connectionMaxAge: configuration.maxConnectionAge, |
105 | | tlsMode: tlsMode, |
106 | | tlsConfiguration: configuration.transportSecurity.tlsConfiguration, |
107 | | httpTargetWindowSize: configuration.http2.targetWindowSize, |
108 | | httpMaxFrameSize: configuration.http2.maxFrameSize, |
109 | | httpMaxResetStreams: configuration.http2.maxResetStreams, |
110 | | errorDelegate: configuration.errorDelegate, |
111 | | debugChannelInitializer: configuration.debugChannelInitializer |
112 | | ) |
113 | | } |
114 | | #else |
115 | 0 | provider = DefaultChannelProvider( |
116 | 0 | connectionTarget: configuration.target, |
117 | 0 | connectionKeepalive: configuration.keepalive, |
118 | 0 | connectionIdleTimeout: configuration.idleTimeout, |
119 | 0 | connectionMaxAge: configuration.maxConnectionAge, |
120 | 0 | tlsMode: tlsMode, |
121 | 0 | tlsConfiguration: configuration.transportSecurity.tlsConfiguration, |
122 | 0 | httpTargetWindowSize: configuration.http2.targetWindowSize, |
123 | 0 | httpMaxFrameSize: configuration.http2.maxFrameSize, |
124 | 0 | httpMaxResetStreams: configuration.http2.maxResetStreams, |
125 | 0 | errorDelegate: configuration.errorDelegate, |
126 | 0 | debugChannelInitializer: configuration.debugChannelInitializer |
127 | 0 | ) |
128 | | #endif |
129 | 0 |
|
130 | 0 | self._pool = PoolManager.makeInitializedPoolManager( |
131 | 0 | using: configuration.eventLoopGroup, |
132 | 0 | perPoolConfiguration: .init( |
133 | 0 | maxConnections: configuration.connectionPool.connectionsPerEventLoop, |
134 | 0 | maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop, |
135 | 0 | minConnections: configuration.connectionPool.minConnectionsPerEventLoop, |
136 | 0 | loadThreshold: configuration.connectionPool.reservationLoadThreshold, |
137 | 0 | assumedMaxConcurrentStreams: 100, |
138 | 0 | connectionBackoff: configuration.connectionBackoff, |
139 | 0 | channelProvider: provider, |
140 | 0 | delegate: configuration.delegate, |
141 | 0 | statsPeriod: configuration.statsPeriod |
142 | 0 | ), |
143 | 0 | logger: configuration.backgroundActivityLogger |
144 | 0 | ) |
145 | 0 | } |
146 | | |
147 | | @inlinable |
148 | | internal func _makeStreamChannel( |
149 | | callOptions: CallOptions |
150 | 0 | ) -> (EventLoopFuture<Channel>, EventLoop) { |
151 | 0 | let preferredEventLoop = callOptions.eventLoopPreference.exact |
152 | 0 | let connectionWaitDeadline = NIODeadline.now() + self._configuration.connectionPool.maxWaitTime |
153 | 0 | let deadline = min(callOptions.timeLimit.makeDeadline(), connectionWaitDeadline) |
154 | 0 |
|
155 | 0 | let streamChannel = self._pool.makeStream( |
156 | 0 | preferredEventLoop: preferredEventLoop, |
157 | 0 | deadline: deadline, |
158 | 0 | logger: callOptions.logger |
159 | 0 | ) { channel in |
160 | 0 | return channel.eventLoop.makeSucceededVoidFuture() |
161 | 0 | } |
162 | 0 |
|
163 | 0 | return (streamChannel.futureResult, preferredEventLoop ?? streamChannel.eventLoop) |
164 | 0 | } |
165 | | |
166 | | // MARK: GRPCChannel conformance |
167 | | |
168 | | @inlinable |
169 | | internal func makeCall<Request, Response>( |
170 | | path: String, |
171 | | type: GRPCCallType, |
172 | | callOptions: CallOptions, |
173 | | interceptors: [ClientInterceptor<Request, Response>] |
174 | 0 | ) -> Call<Request, Response> where Request: Message, Response: Message { |
175 | 0 | var callOptions = callOptions |
176 | 0 | if let requestID = callOptions.requestIDProvider.requestID() { |
177 | 0 | callOptions.applyRequestID(requestID) |
178 | 0 | } |
179 | 0 |
|
180 | 0 | let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions) |
181 | 0 |
|
182 | 0 | return Call( |
183 | 0 | path: path, |
184 | 0 | type: type, |
185 | 0 | eventLoop: eventLoop, |
186 | 0 | options: callOptions, |
187 | 0 | interceptors: interceptors, |
188 | 0 | transportFactory: .http2( |
189 | 0 | channel: stream, |
190 | 0 | authority: self._authority, |
191 | 0 | scheme: self._scheme, |
192 | 0 | maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength, |
193 | 0 | errorDelegate: self._configuration.errorDelegate |
194 | 0 | ) |
195 | 0 | ) |
196 | 0 | } |
197 | | |
198 | | @inlinable |
199 | | internal func makeCall<Request, Response>( |
200 | | path: String, |
201 | | type: GRPCCallType, |
202 | | callOptions: CallOptions, |
203 | | interceptors: [ClientInterceptor<Request, Response>] |
204 | 0 | ) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload { |
205 | 0 | var callOptions = callOptions |
206 | 0 | if let requestID = callOptions.requestIDProvider.requestID() { |
207 | 0 | callOptions.applyRequestID(requestID) |
208 | 0 | } |
209 | 0 |
|
210 | 0 | let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions) |
211 | 0 |
|
212 | 0 | return Call( |
213 | 0 | path: path, |
214 | 0 | type: type, |
215 | 0 | eventLoop: eventLoop, |
216 | 0 | options: callOptions, |
217 | 0 | interceptors: interceptors, |
218 | 0 | transportFactory: .http2( |
219 | 0 | channel: stream, |
220 | 0 | authority: self._authority, |
221 | 0 | scheme: self._scheme, |
222 | 0 | maximumReceiveMessageLength: self._configuration.maximumReceiveMessageLength, |
223 | 0 | errorDelegate: self._configuration.errorDelegate |
224 | 0 | ) |
225 | 0 | ) |
226 | 0 | } |
227 | | |
228 | | @inlinable |
229 | 0 | internal func close(promise: EventLoopPromise<Void>) { |
230 | 0 | self._pool.shutdown(mode: .forceful, promise: promise) |
231 | 0 | } |
232 | | |
233 | | @inlinable |
234 | 0 | internal func close() -> EventLoopFuture<Void> { |
235 | 0 | let promise = self._configuration.eventLoopGroup.next().makePromise(of: Void.self) |
236 | 0 | self.close(promise: promise) |
237 | 0 | return promise.futureResult |
238 | 0 | } |
239 | | |
240 | | @usableFromInline |
241 | 0 | internal func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) { |
242 | 0 | self._pool.shutdown(mode: .graceful(deadline), promise: promise) |
243 | 0 | } |
244 | | } |
245 | | |
246 | | extension CallOptions { |
247 | | @usableFromInline |
248 | 0 | mutating func applyRequestID(_ requestID: String) { |
249 | 0 | self.logger[metadataKey: MetadataKey.requestID] = "\(requestID)" |
250 | 0 | // Add the request ID header too. |
251 | 0 | if let requestIDHeader = self.requestIDHeader { |
252 | 0 | self.customMetadata.add(name: requestIDHeader, value: requestID) |
253 | 0 | } |
254 | 0 | } |
255 | | } |