/src/grpc-swift/Sources/GRPC/ClientConnection.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2019, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | import Logging |
18 | | import NIOCore |
19 | | import NIOHPACK |
20 | | import NIOHTTP2 |
21 | | import NIOPosix |
22 | | import NIOTLS |
23 | | import NIOTransportServices |
24 | | import SwiftProtobuf |
25 | | |
26 | | #if os(Linux) |
27 | | @preconcurrency import Foundation |
28 | | #else |
29 | | import Foundation |
30 | | #endif |
31 | | |
32 | | #if canImport(NIOSSL) |
33 | | import NIOSSL |
34 | | #endif |
35 | | |
36 | | #if canImport(Network) |
37 | | import Network |
38 | | #endif |
39 | | |
40 | | /// Provides a single, managed connection to a server which is guaranteed to always use the same |
41 | | /// `EventLoop`. |
42 | | /// |
43 | | /// The connection to the server is provided by a single channel which will attempt to reconnect to |
44 | | /// the server if the connection is dropped. When either the client or server detects that the |
45 | | /// connection has become idle -- that is, there are no outstanding RPCs and the idle timeout has |
46 | | /// passed (5 minutes, by default) -- the underlying channel will be closed. The client will not |
47 | | /// idle the connection if any RPC exists, even if there has been no activity on the RPC for the |
48 | | /// idle timeout. Long-lived, low activity RPCs may benefit from configuring keepalive (see |
49 | | /// ``ClientConnectionKeepalive``) which periodically pings the server to ensure that the connection |
50 | | /// is not dropped. If the connection is idle a new channel will be created on-demand when the next |
51 | | /// RPC is made. |
52 | | /// |
53 | | /// The state of the connection can be observed using a ``ConnectivityStateDelegate``. |
54 | | /// |
55 | | /// Since the connection is managed, and may potentially spend long periods of time waiting for a |
56 | | /// connection to come up (cellular connections, for example), different behaviors may be used when |
57 | | /// starting a call. The different behaviors are detailed in the ``CallStartBehavior`` documentation. |
58 | | /// |
59 | | /// ### Channel Pipeline |
60 | | /// |
61 | | /// The `NIO.ChannelPipeline` for the connection is configured as such: |
62 | | /// |
63 | | /// ┌──────────────────────────┐ |
64 | | /// │ DelegatingErrorHandler │ |
65 | | /// └──────────▲───────────────┘ |
66 | | /// HTTP2Frame│ |
67 | | /// │ ⠇ ⠇ ⠇ ⠇ |
68 | | /// │ ┌┴─▼┐ ┌┴─▼┐ |
69 | | /// │ │ | │ | HTTP/2 streams |
70 | | /// │ └▲─┬┘ └▲─┬┘ |
71 | | /// │ │ │ │ │ HTTP2Frame |
72 | | /// ┌─┴────────────────┴─▼───┴─▼┐ |
73 | | /// │ HTTP2StreamMultiplexer | |
74 | | /// └─▲───────────────────────┬─┘ |
75 | | /// HTTP2Frame│ │HTTP2Frame |
76 | | /// ┌─┴───────────────────────▼─┐ |
77 | | /// │ GRPCIdleHandler │ |
78 | | /// └─▲───────────────────────┬─┘ |
79 | | /// HTTP2Frame│ │HTTP2Frame |
80 | | /// ┌─┴───────────────────────▼─┐ |
81 | | /// │ NIOHTTP2Handler │ |
82 | | /// └─▲───────────────────────┬─┘ |
83 | | /// ByteBuffer│ │ByteBuffer |
84 | | /// ┌─┴───────────────────────▼─┐ |
85 | | /// │ NIOSSLHandler │ |
86 | | /// └─▲───────────────────────┬─┘ |
87 | | /// ByteBuffer│ │ByteBuffer |
88 | | /// │ ▼ |
89 | | /// |
90 | | /// The 'GRPCIdleHandler' intercepts HTTP/2 frames and various events and is responsible for |
91 | | /// informing and controlling the state of the connection (idling and keepalive). The HTTP/2 streams |
92 | | /// are used to handle individual RPCs. |
93 | | public final class ClientConnection: Sendable { |
94 | | private let connectionManager: ConnectionManager |
95 | | |
96 | | /// HTTP multiplexer from the underlying channel handling gRPC calls. |
97 | 0 | internal func getMultiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> { |
98 | 0 | return self.connectionManager.getHTTP2Multiplexer() |
99 | 0 | } |
100 | | |
101 | | /// The configuration for this client. |
102 | | internal let configuration: Configuration |
103 | | |
104 | | /// The scheme of the URI for each RPC, i.e. 'http' or 'https'. |
105 | | internal let scheme: String |
106 | | |
107 | | /// The authority of the URI for each RPC. |
108 | | internal let authority: String |
109 | | |
110 | | /// A monitor for the connectivity state. |
111 | | public let connectivity: ConnectivityStateMonitor |
112 | | |
113 | | /// The `EventLoop` this connection is using. |
114 | 0 | public var eventLoop: EventLoop { |
115 | 0 | return self.connectionManager.eventLoop |
116 | 0 | } |
117 | | |
118 | | /// Creates a new connection from the given configuration. Prefer using |
119 | | /// ``ClientConnection/secure(group:)`` to build a connection secured with TLS or |
120 | | /// ``ClientConnection/insecure(group:)`` to build a plaintext connection. |
121 | | /// |
122 | | /// - Important: Users should prefer using ``ClientConnection/secure(group:)`` to build a connection |
123 | | /// with TLS, or ``ClientConnection/insecure(group:)`` to build a connection without TLS. |
124 | 0 | public init(configuration: Configuration) { |
125 | 0 | self.configuration = configuration |
126 | 0 | self.scheme = configuration.tlsConfiguration == nil ? "http" : "https" |
127 | 0 | self.authority = configuration.tlsConfiguration?.hostnameOverride ?? configuration.target.host |
128 | 0 |
|
129 | 0 | let monitor = ConnectivityStateMonitor( |
130 | 0 | delegate: configuration.connectivityStateDelegate, |
131 | 0 | queue: configuration.connectivityStateDelegateQueue |
132 | 0 | ) |
133 | 0 |
|
134 | 0 | self.connectivity = monitor |
135 | 0 | self.connectionManager = ConnectionManager( |
136 | 0 | configuration: configuration, |
137 | 0 | connectivityDelegate: monitor, |
138 | 0 | idleBehavior: .closeWhenIdleTimeout, |
139 | 0 | logger: configuration.backgroundActivityLogger |
140 | 0 | ) |
141 | 0 | } |
142 | | |
143 | | /// Close the channel, and any connections associated with it. Any ongoing RPCs may fail. |
144 | | /// |
145 | | /// - Returns: Returns a future which will be resolved when shutdown has completed. |
146 | 0 | public func close() -> EventLoopFuture<Void> { |
147 | 0 | let promise = self.eventLoop.makePromise(of: Void.self) |
148 | 0 | self.close(promise: promise) |
149 | 0 | return promise.futureResult |
150 | 0 | } |
151 | | |
152 | | /// Close the channel, and any connections associated with it. Any ongoing RPCs may fail. |
153 | | /// |
154 | | /// - Parameter promise: A promise which will be completed when shutdown has completed. |
155 | 0 | public func close(promise: EventLoopPromise<Void>) { |
156 | 0 | self.connectionManager.shutdown(mode: .forceful, promise: promise) |
157 | 0 | } |
158 | | |
159 | | /// Attempt to gracefully shutdown the channel. New RPCs will be failed immediately and existing |
160 | | /// RPCs may continue to run until they complete. |
161 | | /// |
162 | | /// - Parameters: |
163 | | /// - deadline: A point in time by which the graceful shutdown must have completed. If the |
164 | | /// deadline passes and RPCs are still active then the connection will be closed forcefully |
165 | | /// and any remaining in-flight RPCs may be failed. |
166 | | /// - promise: A promise which will be completed when shutdown has completed. |
167 | 0 | public func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) { |
168 | 0 | return self.connectionManager.shutdown(mode: .graceful(deadline), promise: promise) |
169 | 0 | } |
170 | | |
171 | | /// Populates the logger in `options` and appends a request ID header to the metadata, if |
172 | | /// configured. |
173 | | /// - Parameter options: The options containing the logger to populate. |
174 | 0 | private func populateLogger(in options: inout CallOptions) { |
175 | 0 | // Get connection metadata. |
176 | 0 | self.connectionManager.appendMetadata(to: &options.logger) |
177 | 0 |
|
178 | 0 | // Attach a request ID. |
179 | 0 | let requestID = options.requestIDProvider.requestID() |
180 | 0 | if let requestID = requestID { |
181 | 0 | options.logger[metadataKey: MetadataKey.requestID] = "\(requestID)" |
182 | 0 | // Add the request ID header too. |
183 | 0 | if let requestIDHeader = options.requestIDHeader { |
184 | 0 | options.customMetadata.add(name: requestIDHeader, value: requestID) |
185 | 0 | } |
186 | 0 | } |
187 | 0 | } |
188 | | } |
189 | | |
190 | | extension ClientConnection: GRPCChannel { |
191 | | public func makeCall<Request: Message, Response: Message>( |
192 | | path: String, |
193 | | type: GRPCCallType, |
194 | | callOptions: CallOptions, |
195 | | interceptors: [ClientInterceptor<Request, Response>] |
196 | 0 | ) -> Call<Request, Response> { |
197 | 0 | var options = callOptions |
198 | 0 | self.populateLogger(in: &options) |
199 | 0 | let multiplexer = self.getMultiplexer() |
200 | 0 | let eventLoop = callOptions.eventLoopPreference.exact ?? multiplexer.eventLoop |
201 | 0 |
|
202 | 0 | // This should be on the same event loop as the multiplexer (i.e. the event loop of the |
203 | 0 | // underlying `Channel`. |
204 | 0 | let channel = multiplexer.eventLoop.makePromise(of: Channel.self) |
205 | 0 | multiplexer.whenComplete { |
206 | 0 | ClientConnection.makeStreamChannel(using: $0, promise: channel) |
207 | 0 | } |
208 | 0 |
|
209 | 0 | return Call( |
210 | 0 | path: path, |
211 | 0 | type: type, |
212 | 0 | eventLoop: eventLoop, |
213 | 0 | options: options, |
214 | 0 | interceptors: interceptors, |
215 | 0 | transportFactory: .http2( |
216 | 0 | channel: channel.futureResult, |
217 | 0 | authority: self.authority, |
218 | 0 | scheme: self.scheme, |
219 | 0 | maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength, |
220 | 0 | errorDelegate: self.configuration.errorDelegate |
221 | 0 | ) |
222 | 0 | ) |
223 | 0 | } |
224 | | |
225 | | public func makeCall<Request: GRPCPayload, Response: GRPCPayload>( |
226 | | path: String, |
227 | | type: GRPCCallType, |
228 | | callOptions: CallOptions, |
229 | | interceptors: [ClientInterceptor<Request, Response>] |
230 | 0 | ) -> Call<Request, Response> { |
231 | 0 | var options = callOptions |
232 | 0 | self.populateLogger(in: &options) |
233 | 0 | let multiplexer = self.getMultiplexer() |
234 | 0 | let eventLoop = callOptions.eventLoopPreference.exact ?? multiplexer.eventLoop |
235 | 0 |
|
236 | 0 | // This should be on the same event loop as the multiplexer (i.e. the event loop of the |
237 | 0 | // underlying `Channel`. |
238 | 0 | let channel = multiplexer.eventLoop.makePromise(of: Channel.self) |
239 | 0 | multiplexer.whenComplete { |
240 | 0 | ClientConnection.makeStreamChannel(using: $0, promise: channel) |
241 | 0 | } |
242 | 0 |
|
243 | 0 | return Call( |
244 | 0 | path: path, |
245 | 0 | type: type, |
246 | 0 | eventLoop: eventLoop, |
247 | 0 | options: options, |
248 | 0 | interceptors: interceptors, |
249 | 0 | transportFactory: .http2( |
250 | 0 | channel: channel.futureResult, |
251 | 0 | authority: self.authority, |
252 | 0 | scheme: self.scheme, |
253 | 0 | maximumReceiveMessageLength: self.configuration.maximumReceiveMessageLength, |
254 | 0 | errorDelegate: self.configuration.errorDelegate |
255 | 0 | ) |
256 | 0 | ) |
257 | 0 | } |
258 | | |
259 | | private static func makeStreamChannel( |
260 | | using result: Result<HTTP2StreamMultiplexer, Error>, |
261 | | promise: EventLoopPromise<Channel> |
262 | 0 | ) { |
263 | 0 | switch result { |
264 | 0 | case let .success(multiplexer): |
265 | 0 | multiplexer.createStreamChannel(promise: promise) { |
266 | 0 | $0.eventLoop.makeSucceededVoidFuture() |
267 | 0 | } |
268 | 0 | case let .failure(error): |
269 | 0 | promise.fail(error) |
270 | 0 | } |
271 | 0 | } |
272 | | } |
273 | | |
274 | | // MARK: - Configuration structures |
275 | | |
276 | | /// A target to connect to. |
277 | | public struct ConnectionTarget: Sendable, Hashable { |
278 | | internal enum Wrapped: Hashable { |
279 | | case hostAndPort(String, Int) |
280 | | case unixDomainSocket(String) |
281 | | case socketAddress(SocketAddress) |
282 | | case connectedSocket(NIOBSDSocket.Handle) |
283 | | case vsockAddress(VsockAddress) |
284 | | } |
285 | | |
286 | | internal var wrapped: Wrapped |
287 | 331k | private init(_ wrapped: Wrapped) { |
288 | 331k | self.wrapped = wrapped |
289 | 331k | } |
290 | | |
291 | | /// The host and port. The port is 443 by default. |
292 | 0 | public static func host(_ host: String, port: Int = 443) -> ConnectionTarget { |
293 | 0 | return ConnectionTarget(.hostAndPort(host, port)) |
294 | 0 | } |
295 | | |
296 | | /// The host and port. |
297 | 0 | public static func hostAndPort(_ host: String, _ port: Int) -> ConnectionTarget { |
298 | 0 | return ConnectionTarget(.hostAndPort(host, port)) |
299 | 0 | } |
300 | | |
301 | | /// The path of a Unix domain socket. |
302 | 92.1k | public static func unixDomainSocket(_ path: String) -> ConnectionTarget { |
303 | 92.1k | return ConnectionTarget(.unixDomainSocket(path)) |
304 | 92.1k | } |
305 | | |
306 | | /// A NIO socket address. |
307 | 0 | public static func socketAddress(_ address: SocketAddress) -> ConnectionTarget { |
308 | 0 | return ConnectionTarget(.socketAddress(address)) |
309 | 0 | } |
310 | | |
311 | | /// A connected NIO socket. |
312 | 0 | public static func connectedSocket(_ socket: NIOBSDSocket.Handle) -> ConnectionTarget { |
313 | 0 | return ConnectionTarget(.connectedSocket(socket)) |
314 | 0 | } |
315 | | |
316 | | /// A vsock socket. |
317 | 0 | public static func vsockAddress(_ vsockAddress: VsockAddress) -> ConnectionTarget { |
318 | 0 | return ConnectionTarget(.vsockAddress(vsockAddress)) |
319 | 0 | } |
320 | | |
321 | | @usableFromInline |
322 | 0 | var host: String { |
323 | 0 | switch self.wrapped { |
324 | 0 | case let .hostAndPort(host, _): |
325 | 0 | return host |
326 | 0 | case let .socketAddress(.v4(address)): |
327 | 0 | return address.host |
328 | 0 | case let .socketAddress(.v6(address)): |
329 | 0 | return address.host |
330 | 0 | case .unixDomainSocket, .socketAddress(.unixDomainSocket), .connectedSocket: |
331 | 0 | return "localhost" |
332 | 0 | case let .vsockAddress(address): |
333 | 0 | return "vsock://\(address.cid)" |
334 | 0 | } |
335 | 0 | } |
336 | | } |
337 | | |
338 | | /// The connectivity behavior to use when starting an RPC. |
339 | | public struct CallStartBehavior: Hashable, Sendable { |
340 | | internal enum Behavior: Hashable, Sendable { |
341 | | case waitsForConnectivity |
342 | | case fastFailure |
343 | | } |
344 | | |
345 | | internal var wrapped: Behavior |
346 | 0 | private init(_ wrapped: Behavior) { |
347 | 0 | self.wrapped = wrapped |
348 | 0 | } |
349 | | |
350 | | /// Waits for connectivity (that is, the 'ready' connectivity state) before attempting to start |
351 | | /// an RPC. Doing so may involve multiple connection attempts. |
352 | | /// |
353 | | /// This is the preferred, and default, behaviour. |
354 | | public static let waitsForConnectivity = CallStartBehavior(.waitsForConnectivity) |
355 | | |
356 | | /// The 'fast failure' behaviour is intended for cases where users would rather their RPC failed |
357 | | /// quickly rather than waiting for an active connection. The behaviour depends on the current |
358 | | /// connectivity state: |
359 | | /// |
360 | | /// - Idle: a connection attempt will be started and the RPC will fail if that attempt fails. |
361 | | /// - Connecting: a connection attempt is already in progress, the RPC will fail if that attempt |
362 | | /// fails. |
363 | | /// - Ready: a connection is already active: the RPC will be started using that connection. |
364 | | /// - Transient failure: the last connection or connection attempt failed and gRPC is waiting to |
365 | | /// connect again. The RPC will fail immediately. |
366 | | /// - Shutdown: the connection is shutdown, the RPC will fail immediately. |
367 | | public static let fastFailure = CallStartBehavior(.fastFailure) |
368 | | } |
369 | | |
370 | | extension ClientConnection { |
371 | | /// Configuration for a ``ClientConnection``. Users should prefer using one of the |
372 | | /// ``ClientConnection`` builders: ``ClientConnection/secure(group:)`` or ``ClientConnection/insecure(group:)``. |
373 | | public struct Configuration: Sendable { |
374 | | /// The target to connect to. |
375 | | public var target: ConnectionTarget |
376 | | |
377 | | /// The event loop group to run the connection on. |
378 | | public var eventLoopGroup: EventLoopGroup |
379 | | |
380 | | /// An error delegate which is called when errors are caught. Provided delegates **must not |
381 | | /// maintain a strong reference to this `ClientConnection`**. Doing so will cause a retain |
382 | | /// cycle. Defaults to ``LoggingClientErrorDelegate``. |
383 | 0 | public var errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate.shared |
384 | | |
385 | | /// A delegate which is called when the connectivity state is changed. Defaults to `nil`. |
386 | | public var connectivityStateDelegate: ConnectivityStateDelegate? |
387 | | |
388 | | /// The `DispatchQueue` on which to call the connectivity state delegate. If a delegate is |
389 | | /// provided but the queue is `nil` then one will be created by gRPC. Defaults to `nil`. |
390 | | public var connectivityStateDelegateQueue: DispatchQueue? |
391 | | |
392 | | #if canImport(NIOSSL) |
393 | | /// TLS configuration for this connection. `nil` if TLS is not desired. |
394 | | /// |
395 | | /// - Important: `tls` is deprecated; use ``tlsConfiguration`` or one of |
396 | | /// the ``ClientConnection/usingTLS(with:on:)`` builder functions. |
397 | | @available(*, deprecated, renamed: "tlsConfiguration") |
398 | | public var tls: TLS? { |
399 | 0 | get { |
400 | 0 | return self.tlsConfiguration?.asDeprecatedClientConfiguration |
401 | 0 | } |
402 | 0 | set { |
403 | 0 | self.tlsConfiguration = newValue.map { .init(transforming: $0) } |
404 | 0 | } |
405 | | } |
406 | | #endif // canImport(NIOSSL) |
407 | | |
408 | | /// TLS configuration for this connection. `nil` if TLS is not desired. |
409 | | public var tlsConfiguration: GRPCTLSConfiguration? |
410 | | |
411 | | /// The connection backoff configuration. If no connection retrying is required then this should |
412 | | /// be `nil`. |
413 | 0 | public var connectionBackoff: ConnectionBackoff? = ConnectionBackoff() |
414 | | |
415 | | /// The connection keepalive configuration. |
416 | 0 | public var connectionKeepalive = ClientConnectionKeepalive() |
417 | | |
418 | | /// The amount of time to wait before closing the connection. The idle timeout will start only |
419 | | /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. |
420 | | /// |
421 | | /// If a connection becomes idle, starting a new RPC will automatically create a new connection. |
422 | | /// |
423 | | /// Defaults to 30 minutes. |
424 | 0 | public var connectionIdleTimeout: TimeAmount = .minutes(30) |
425 | | |
426 | | /// The maximum allowed age of a connection. |
427 | | /// |
428 | | /// If set, no new RPCs will be started on the connection after the connection has been opened |
429 | | /// for this period of time. Existing RPCs will be allowed to continue and the connection will |
430 | | /// close once all RPCs on the connection have finished. If this isn't set then connections have |
431 | | /// no limit on their lifetime. |
432 | 0 | public var connectionMaxAge: TimeAmount? = nil |
433 | | |
434 | | /// The behavior used to determine when an RPC should start. That is, whether it should wait for |
435 | | /// an active connection or fail quickly if no connection is currently available. |
436 | | /// |
437 | | /// Defaults to ``CallStartBehavior/waitsForConnectivity``. |
438 | 0 | public var callStartBehavior: CallStartBehavior = .waitsForConnectivity |
439 | | |
440 | | /// The HTTP/2 flow control target window size. Defaults to 8MB. Values are clamped between |
441 | | /// 1 and 2^31-1 inclusive. |
442 | 0 | public var httpTargetWindowSize = 8 * 1024 * 1024 { |
443 | 0 | didSet { |
444 | 0 | self.httpTargetWindowSize = self.httpTargetWindowSize.clamped(to: 1 ... Int(Int32.max)) |
445 | 0 | } |
446 | | } |
447 | | |
448 | | /// The HTTP/2 max frame size. Defaults to 16384. Value is clamped between 2^14 and 2^24-1 |
449 | | /// octets inclusive (the minimum and maximum allowable values - HTTP/2 RFC 7540 4.2). |
450 | 0 | public var httpMaxFrameSize: Int = 16384 { |
451 | 0 | didSet { |
452 | 0 | self.httpMaxFrameSize = self.httpMaxFrameSize.clamped(to: 16384 ... 16_777_215) |
453 | 0 | } |
454 | | } |
455 | | |
456 | | /// The HTTP protocol used for this connection. |
457 | 0 | public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol { |
458 | 0 | return self.tlsConfiguration == nil ? .http : .https |
459 | 0 | } |
460 | | |
461 | | /// The maximum size in bytes of a message which may be received from a server. Defaults to 4MB. |
462 | 0 | public var maximumReceiveMessageLength: Int = 4 * 1024 * 1024 { |
463 | 0 | willSet { |
464 | 0 | precondition(newValue >= 0, "maximumReceiveMessageLength must be positive") |
465 | 0 | } |
466 | | } |
467 | | |
468 | | /// The HTTP/2 max number of reset streams. Defaults to 32. Must be non-negative. |
469 | 0 | public var httpMaxResetStreams: Int = 32 { |
470 | 0 | willSet { |
471 | 0 | precondition(newValue >= 0, "httpMaxResetStreams must be non-negative") |
472 | 0 | } |
473 | | } |
474 | | |
475 | | /// A logger for background information (such as connectivity state). A separate logger for |
476 | | /// requests may be provided in the `CallOptions`. |
477 | | /// |
478 | | /// Defaults to a no-op logger. |
479 | 0 | public var backgroundActivityLogger = Logger( |
480 | 0 | label: "io.grpc", |
481 | 0 | factory: { _ in SwiftLogNoOpLogHandler() } |
482 | 0 | ) |
483 | | |
484 | | /// A channel initializer which will be run after gRPC has initialized each channel. This may be |
485 | | /// used to add additional handlers to the pipeline and is intended for debugging. |
486 | | /// |
487 | | /// - Warning: The initializer closure may be invoked *multiple times*. |
488 | | @preconcurrency |
489 | | public var debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? |
490 | | |
491 | | #if canImport(Network) |
492 | | /// A closure allowing to customise the `NWParameters` used when establishing a connection using `NIOTransportServices`. |
493 | | @available(macOS 10.14, iOS 12.0, watchOS 6.0, tvOS 12.0, *) |
494 | | public var nwParametersConfigurator: (@Sendable (NWParameters) -> Void)? { |
495 | | get { |
496 | | self._nwParametersConfigurator as! (@Sendable (NWParameters) -> Void)? |
497 | | } |
498 | | set { |
499 | | self._nwParametersConfigurator = newValue |
500 | | } |
501 | | } |
502 | | |
503 | | private var _nwParametersConfigurator: (any Sendable)? |
504 | | #endif |
505 | | |
506 | | #if canImport(NIOSSL) |
507 | | /// Create a `Configuration` with some pre-defined defaults. Prefer using |
508 | | /// `ClientConnection.secure(group:)` to build a connection secured with TLS or |
509 | | /// `ClientConnection.insecure(group:)` to build a plaintext connection. |
510 | | /// |
511 | | /// - Parameter target: The target to connect to. |
512 | | /// - Parameter eventLoopGroup: The event loop group to run the connection on. |
513 | | /// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only |
514 | | /// on debug builds. |
515 | | /// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`. |
516 | | /// - Parameter connectivityStateDelegateQueue: A `DispatchQueue` on which to call the |
517 | | /// `connectivityStateDelegate`. |
518 | | /// - Parameter tls: TLS configuration, defaulting to `nil`. |
519 | | /// - Parameter connectionBackoff: The connection backoff configuration to use. |
520 | | /// - Parameter connectionKeepalive: The keepalive configuration to use. |
521 | | /// - Parameter connectionIdleTimeout: The amount of time to wait before closing the connection, defaulting to 30 minutes. |
522 | | /// - Parameter callStartBehavior: The behavior used to determine when a call should start in |
523 | | /// relation to its underlying connection. Defaults to `waitsForConnectivity`. |
524 | | /// - Parameter httpTargetWindowSize: The HTTP/2 flow control target window size. |
525 | | /// - Parameter backgroundActivityLogger: A logger for background information (such as |
526 | | /// connectivity state). Defaults to a no-op logger. |
527 | | /// - Parameter debugChannelInitializer: A channel initializer will be called after gRPC has |
528 | | /// initialized the channel. Defaults to `nil`. |
529 | | @available(*, deprecated, renamed: "default(target:eventLoopGroup:)") |
530 | | @preconcurrency |
531 | | public init( |
532 | | target: ConnectionTarget, |
533 | | eventLoopGroup: EventLoopGroup, |
534 | | errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(), |
535 | | connectivityStateDelegate: ConnectivityStateDelegate? = nil, |
536 | | connectivityStateDelegateQueue: DispatchQueue? = nil, |
537 | | tls: Configuration.TLS? = nil, |
538 | | connectionBackoff: ConnectionBackoff? = ConnectionBackoff(), |
539 | | connectionKeepalive: ClientConnectionKeepalive = ClientConnectionKeepalive(), |
540 | | connectionIdleTimeout: TimeAmount = .minutes(30), |
541 | | callStartBehavior: CallStartBehavior = .waitsForConnectivity, |
542 | | httpTargetWindowSize: Int = 8 * 1024 * 1024, |
543 | | backgroundActivityLogger: Logger = Logger( |
544 | | label: "io.grpc", |
545 | 0 | factory: { _ in SwiftLogNoOpLogHandler() } |
546 | | ), |
547 | | debugChannelInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil |
548 | 0 | ) { |
549 | 0 | self.target = target |
550 | 0 | self.eventLoopGroup = eventLoopGroup |
551 | 0 | self.errorDelegate = errorDelegate |
552 | 0 | self.connectivityStateDelegate = connectivityStateDelegate |
553 | 0 | self.connectivityStateDelegateQueue = connectivityStateDelegateQueue |
554 | 0 | self.tlsConfiguration = tls.map { GRPCTLSConfiguration(transforming: $0) } |
555 | 0 | self.connectionBackoff = connectionBackoff |
556 | 0 | self.connectionKeepalive = connectionKeepalive |
557 | 0 | self.connectionIdleTimeout = connectionIdleTimeout |
558 | 0 | self.callStartBehavior = callStartBehavior |
559 | 0 | self.httpTargetWindowSize = httpTargetWindowSize |
560 | 0 | self.backgroundActivityLogger = backgroundActivityLogger |
561 | 0 | self.debugChannelInitializer = debugChannelInitializer |
562 | 0 | } |
563 | | #endif // canImport(NIOSSL) |
564 | | |
565 | 0 | private init(eventLoopGroup: EventLoopGroup, target: ConnectionTarget) { |
566 | 0 | self.eventLoopGroup = eventLoopGroup |
567 | 0 | self.target = target |
568 | 0 | } |
569 | | |
570 | | /// Make a new configuration using default values. |
571 | | /// |
572 | | /// - Parameters: |
573 | | /// - target: The target to connect to. |
574 | | /// - eventLoopGroup: The `EventLoopGroup` providing an `EventLoop` for the connection to |
575 | | /// run on. |
576 | | /// - Returns: A configuration with default values set. |
577 | | public static func `default`( |
578 | | target: ConnectionTarget, |
579 | | eventLoopGroup: EventLoopGroup |
580 | 0 | ) -> Configuration { |
581 | 0 | return .init(eventLoopGroup: eventLoopGroup, target: target) |
582 | 0 | } |
583 | | } |
584 | | } |
585 | | |
586 | | // MARK: - Configuration helpers/extensions |
587 | | |
588 | | extension ClientBootstrapProtocol { |
589 | | /// Connect to the given connection target. |
590 | | /// |
591 | | /// - Parameter target: The target to connect to. |
592 | 0 | func connect(to target: ConnectionTarget) -> EventLoopFuture<Channel> { |
593 | 0 | switch target.wrapped { |
594 | 0 | case let .hostAndPort(host, port): |
595 | 0 | return self.connect(host: host, port: port) |
596 | 0 |
|
597 | 0 | case let .unixDomainSocket(path): |
598 | 0 | return self.connect(unixDomainSocketPath: path) |
599 | 0 |
|
600 | 0 | case let .socketAddress(address): |
601 | 0 | return self.connect(to: address) |
602 | 0 | case let .connectedSocket(socket): |
603 | 0 | return self.withConnectedSocket(socket) |
604 | 0 | case let .vsockAddress(address): |
605 | 0 | return self.connect(to: address) |
606 | 0 | } |
607 | 0 | } |
608 | | } |
609 | | |
610 | | #if canImport(NIOSSL) |
611 | | extension ChannelPipeline.SynchronousOperations { |
612 | | internal func configureNIOSSLForGRPCClient( |
613 | | sslContext: Result<NIOSSLContext, Error>, |
614 | | serverHostname: String?, |
615 | | customVerificationCallback: NIOSSLCustomVerificationCallback?, |
616 | | logger: Logger |
617 | 0 | ) throws { |
618 | 0 | let sslContext = try sslContext.get() |
619 | 0 | let sslClientHandler: NIOSSLClientHandler |
620 | 0 |
|
621 | 0 | if let customVerificationCallback = customVerificationCallback { |
622 | 0 | sslClientHandler = try NIOSSLClientHandler( |
623 | 0 | context: sslContext, |
624 | 0 | serverHostname: serverHostname, |
625 | 0 | customVerificationCallback: customVerificationCallback |
626 | 0 | ) |
627 | 0 | } else { |
628 | 0 | sslClientHandler = try NIOSSLClientHandler( |
629 | 0 | context: sslContext, |
630 | 0 | serverHostname: serverHostname |
631 | 0 | ) |
632 | 0 | } |
633 | 0 |
|
634 | 0 | try self.addHandler(sslClientHandler) |
635 | 0 | try self.addHandler(TLSVerificationHandler(logger: logger)) |
636 | 0 | } |
637 | | } |
638 | | #endif // canImport(NIOSSL) |
639 | | |
640 | | extension ChannelPipeline.SynchronousOperations { |
641 | | internal func configureHTTP2AndGRPCHandlersForGRPCClient( |
642 | | channel: Channel, |
643 | | connectionManager: ConnectionManager, |
644 | | connectionKeepalive: ClientConnectionKeepalive, |
645 | | connectionIdleTimeout: TimeAmount, |
646 | | connectionMaxAge: TimeAmount?, |
647 | | httpTargetWindowSize: Int, |
648 | | httpMaxFrameSize: Int, |
649 | | httpMaxResetStreams: Int, |
650 | | errorDelegate: ClientErrorDelegate?, |
651 | | logger: Logger |
652 | 0 | ) throws { |
653 | 0 | var configuration = NIOHTTP2Handler.ConnectionConfiguration() |
654 | 0 | configuration.initialSettings = [ |
655 | 0 | // As per the default settings for swift-nio-http2: |
656 | 0 | HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize), |
657 | 0 | // We never expect (or allow) server initiated streams. |
658 | 0 | HTTP2Setting(parameter: .maxConcurrentStreams, value: 0), |
659 | 0 | // As configured by the user. |
660 | 0 | HTTP2Setting(parameter: .maxFrameSize, value: httpMaxFrameSize), |
661 | 0 | HTTP2Setting(parameter: .initialWindowSize, value: httpTargetWindowSize), |
662 | 0 | ] |
663 | 0 | configuration.maximumRecentlyResetStreams = httpMaxResetStreams |
664 | 0 |
|
665 | 0 | // We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the |
666 | 0 | // two HTTP/2 handlers so we'll do it manually instead. |
667 | 0 | try self.addHandler(NIOHTTP2Handler(mode: .client, connectionConfiguration: configuration)) |
668 | 0 |
|
669 | 0 | let h2Multiplexer = HTTP2StreamMultiplexer( |
670 | 0 | mode: .client, |
671 | 0 | channel: channel, |
672 | 0 | targetWindowSize: httpTargetWindowSize, |
673 | 0 | inboundStreamInitializer: nil |
674 | 0 | ) |
675 | 0 |
|
676 | 0 | // The multiplexer is passed through the idle handler so it is only reported on |
677 | 0 | // successful channel activation - with happy eyeballs multiple pipelines can |
678 | 0 | // be constructed so it's not safe to report just yet. |
679 | 0 | try self.addHandler( |
680 | 0 | GRPCIdleHandler( |
681 | 0 | connectionManager: connectionManager, |
682 | 0 | multiplexer: h2Multiplexer, |
683 | 0 | idleTimeout: connectionIdleTimeout, |
684 | 0 | maxAge: connectionMaxAge, |
685 | 0 | keepalive: connectionKeepalive, |
686 | 0 | logger: logger |
687 | 0 | ) |
688 | 0 | ) |
689 | 0 |
|
690 | 0 | try self.addHandler(h2Multiplexer) |
691 | 0 | try self.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate)) |
692 | 0 | } |
693 | | } |
694 | | |
695 | | extension Channel { |
696 | | func configureGRPCClient( |
697 | | errorDelegate: ClientErrorDelegate?, |
698 | | logger: Logger |
699 | 0 | ) -> EventLoopFuture<Void> { |
700 | 0 | return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in |
701 | 0 | self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate)) |
702 | 0 | } |
703 | 0 | } |
704 | | } |
705 | | |
706 | | extension TimeAmount { |
707 | | /// Creates a new `TimeAmount` from the given time interval in seconds. |
708 | | /// |
709 | | /// - Parameter timeInterval: The amount of time in seconds |
710 | 0 | static func seconds(timeInterval: TimeInterval) -> TimeAmount { |
711 | 0 | return .nanoseconds(Int64(timeInterval * 1_000_000_000)) |
712 | 0 | } |
713 | | } |
714 | | |
715 | | extension String { |
716 | 0 | var isIPAddress: Bool { |
717 | 0 | // We need some scratch space to let inet_pton write into. |
718 | 0 | var ipv4Addr = in_addr() |
719 | 0 | var ipv6Addr = in6_addr() |
720 | 0 |
|
721 | 0 | return self.withCString { ptr in |
722 | 0 | inet_pton(AF_INET, ptr, &ipv4Addr) == 1 || inet_pton(AF_INET6, ptr, &ipv6Addr) == 1 |
723 | 0 | } |
724 | 0 | } |
725 | | } |