/src/grpc-swift/Sources/GRPC/Interceptor/ClientTransport.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2020, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import Logging |
17 | | import NIOCore |
18 | | import NIOHPACK |
19 | | import NIOHTTP2 |
20 | | |
21 | | /// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact |
22 | | /// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very |
23 | | /// little API to use on this class: they may configure the transport by adding it to a |
24 | | /// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and |
25 | | /// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the |
26 | | /// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer. |
27 | | /// |
28 | | /// In most instances the glue code is simple: transformations are applied to the request and |
29 | | /// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the |
30 | | /// transport keeps track of the state of the call and the `Channel`, taking appropriate action |
31 | | /// when these change. This includes buffering request parts from the interceptor pipeline until |
32 | | /// the `NIO.Channel` becomes active. |
33 | | /// |
34 | | /// ### Thread Safety |
35 | | /// |
36 | | /// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`. |
37 | | @usableFromInline |
38 | | internal final class ClientTransport<Request, Response> { |
39 | | /// The `EventLoop` the call is running on. State must be accessed from this event loop. |
40 | | @usableFromInline |
41 | | internal let callEventLoop: EventLoop |
42 | | |
43 | | /// The current state of the transport. |
44 | 0 | private var state: ClientTransportState = .idle |
45 | | |
46 | | /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active` |
47 | | /// and fail it when we transition to `closed`. |
48 | | private var channelPromise: EventLoopPromise<Channel>? |
49 | | |
50 | | // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will |
51 | | // have 3 parts. |
52 | | /// A buffer to store request parts and promises in before the channel has become active. |
53 | 0 | private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4) |
54 | | |
55 | | /// The request serializer. |
56 | | private let serializer: AnySerializer<Request> |
57 | | |
58 | | /// The response deserializer. |
59 | | private let deserializer: AnyDeserializer<Response> |
60 | | |
61 | | /// A request part and a promise. |
62 | | private struct RequestAndPromise { |
63 | | var request: GRPCClientRequestPart<Request> |
64 | | var promise: EventLoopPromise<Void>? |
65 | | } |
66 | | |
67 | | /// Details about the call. |
68 | | internal let callDetails: CallDetails |
69 | | |
70 | | /// A logger. |
71 | | internal var logger: Logger |
72 | | |
73 | | /// Is the call streaming requests? |
74 | 0 | private var isStreamingRequests: Bool { |
75 | 0 | switch self.callDetails.type { |
76 | 0 | case .unary, .serverStreaming: |
77 | 0 | return false |
78 | 0 | case .clientStreaming, .bidirectionalStreaming: |
79 | 0 | return true |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | | // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more |
84 | | // convenient to have both at the same time when intercepting response parts. We'll hold on to the |
85 | | // trailers here and only forward them when we receive the status. |
86 | | private var trailers: HPACKHeaders? |
87 | | |
88 | | /// The interceptor pipeline connected to this transport. The pipeline also holds references |
89 | | /// to `self` which are dropped when the interceptor pipeline is closed. |
90 | | @usableFromInline |
91 | | internal var _pipeline: ClientInterceptorPipeline<Request, Response>? |
92 | | |
93 | | /// The `NIO.Channel` used by the transport, if it is available. |
94 | | private var channel: Channel? |
95 | | |
96 | | /// A callback which is invoked once when the stream channel becomes active. |
97 | | private var onStart: (() -> Void)? |
98 | | |
99 | | /// Our current state as logging metadata. |
100 | 0 | private var stateForLogging: Logger.MetadataValue { |
101 | 0 | if self.state.mayBuffer { |
102 | 0 | return "\(self.state) (\(self.writeBuffer.count) parts buffered)" |
103 | 0 | } else { |
104 | 0 | return "\(self.state)" |
105 | 0 | } |
106 | 0 | } |
107 | | |
108 | | internal init( |
109 | | details: CallDetails, |
110 | | eventLoop: EventLoop, |
111 | | interceptors: [ClientInterceptor<Request, Response>], |
112 | | serializer: AnySerializer<Request>, |
113 | | deserializer: AnyDeserializer<Response>, |
114 | | errorDelegate: ClientErrorDelegate?, |
115 | | onStart: @escaping () -> Void, |
116 | | onError: @escaping (Error) -> Void, |
117 | | onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void |
118 | 0 | ) { |
119 | 0 | self.callEventLoop = eventLoop |
120 | 0 | self.callDetails = details |
121 | 0 | self.onStart = onStart |
122 | 0 | self.logger = details.options.logger |
123 | 0 | self.serializer = serializer |
124 | 0 | self.deserializer = deserializer |
125 | 0 | // The references to self held by the pipeline are dropped when it is closed. |
126 | 0 | self._pipeline = ClientInterceptorPipeline( |
127 | 0 | eventLoop: eventLoop, |
128 | 0 | details: details, |
129 | 0 | logger: details.options.logger, |
130 | 0 | interceptors: interceptors, |
131 | 0 | errorDelegate: errorDelegate, |
132 | 0 | onError: onError, |
133 | 0 | onCancel: self.cancelFromPipeline(promise:), Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAP0tF7PromiseVyytGSgcAMcfu_ Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAP0tF7PromiseVyytGSgcAMcfu_yA9_cfu0_ |
134 | 0 | onRequestPart: self.sendFromPipeline(_:promise:), Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAA0y7RequestP0OyxG_AP0tF7PromiseVyytGSgtcAMcfu1_ Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAA0y7RequestP0OyxG_AP0tF7PromiseVyytGSgtcAMcfu1_yA8__A12_tcfu2_ |
135 | 0 | onResponsePart: onResponsePart |
136 | 0 | ) |
137 | 0 | } |
138 | | |
139 | | // MARK: - Call Object API |
140 | | |
141 | | /// Configure the transport to communicate with the server. |
142 | | /// - Parameter configurator: A callback to invoke in order to configure this transport. |
143 | | /// - Important: This *must* to be called from the `callEventLoop`. |
144 | 0 | internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) { |
145 | 0 | self.callEventLoop.assertInEventLoop() |
146 | 0 | if self.state.configureTransport() { |
147 | 0 | self.configure(using: configurator) |
148 | 0 | } |
149 | 0 | } |
150 | | |
151 | | /// Send a request part – via the interceptor pipeline – to the server. |
152 | | /// - Parameters: |
153 | | /// - part: The part to send. |
154 | | /// - promise: A promise which will be completed when the request part has been handled. |
155 | | /// - Important: This *must* to be called from the `callEventLoop`. |
156 | | @inlinable |
157 | 0 | internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) { |
158 | 0 | self.callEventLoop.assertInEventLoop() |
159 | 0 | if let pipeline = self._pipeline { |
160 | 0 | pipeline.send(part, promise: promise) |
161 | 0 | } else { |
162 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
163 | 0 | } |
164 | 0 | } |
165 | | |
166 | | /// Attempt to cancel the RPC notifying any interceptors. |
167 | | /// - Parameter promise: A promise which will be completed when the cancellation attempt has |
168 | | /// been handled. |
169 | 0 | internal func cancel(promise: EventLoopPromise<Void>?) { |
170 | 0 | self.callEventLoop.assertInEventLoop() |
171 | 0 | if let pipeline = self._pipeline { |
172 | 0 | pipeline.cancel(promise: promise) |
173 | 0 | } else { |
174 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
175 | 0 | } |
176 | 0 | } |
177 | | |
178 | | /// A request for the underlying `Channel`. |
179 | 0 | internal func getChannel() -> EventLoopFuture<Channel> { |
180 | 0 | self.callEventLoop.assertInEventLoop() |
181 | 0 |
|
182 | 0 | // Do we already have a promise? |
183 | 0 | if let promise = self.channelPromise { |
184 | 0 | return promise.futureResult |
185 | 0 | } else { |
186 | 0 | // Make and store the promise. |
187 | 0 | let promise = self.callEventLoop.makePromise(of: Channel.self) |
188 | 0 | self.channelPromise = promise |
189 | 0 |
|
190 | 0 | // Ask the state machine if we can have it. |
191 | 0 | switch self.state.getChannel() { |
192 | 0 | case .succeed: |
193 | 0 | if let channel = self.channel { |
194 | 0 | promise.succeed(channel) |
195 | 0 | } |
196 | 0 |
|
197 | 0 | case .fail: |
198 | 0 | promise.fail(GRPCError.AlreadyComplete()) |
199 | 0 |
|
200 | 0 | case .doNothing: |
201 | 0 | () |
202 | 0 | } |
203 | 0 |
|
204 | 0 | return promise.futureResult |
205 | 0 | } |
206 | 0 | } |
207 | | } |
208 | | |
209 | | // MARK: - Pipeline API |
210 | | |
211 | | extension ClientTransport { |
212 | | /// Sends a request part on the transport. Should only be called from the interceptor pipeline. |
213 | | /// - Parameters: |
214 | | /// - part: The request part to send. |
215 | | /// - promise: A promise which will be completed when the part has been handled. |
216 | | /// - Important: This *must* to be called from the `callEventLoop`. |
217 | | private func sendFromPipeline( |
218 | | _ part: GRPCClientRequestPart<Request>, |
219 | | promise: EventLoopPromise<Void>? |
220 | 0 | ) { |
221 | 0 | self.callEventLoop.assertInEventLoop() |
222 | 0 | switch self.state.send() { |
223 | 0 | case .writeToBuffer: |
224 | 0 | self.buffer(part, promise: promise) |
225 | 0 |
|
226 | 0 | case .writeToChannel: |
227 | 0 | // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the |
228 | 0 | // correct state, the requirements of that state are having an active `Channel`. |
229 | 0 | self.writeToChannel( |
230 | 0 | self.channel!, |
231 | 0 | part: part, |
232 | 0 | promise: promise, |
233 | 0 | flush: self.shouldFlush(after: part) |
234 | 0 | ) |
235 | 0 |
|
236 | 0 | case .alreadyComplete: |
237 | 0 | promise?.fail(GRPCError.AlreadyComplete()) |
238 | 0 | } |
239 | 0 | } |
240 | | |
241 | | /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline. |
242 | | /// - Parameter promise: A promise which will be completed when the cancellation has been handled. |
243 | | /// - Important: This *must* to be called from the `callEventLoop`. |
244 | 0 | private func cancelFromPipeline(promise: EventLoopPromise<Void>?) { |
245 | 0 | self.callEventLoop.assertInEventLoop() |
246 | 0 |
|
247 | 0 | if self.state.cancel() { |
248 | 0 | let error = GRPCError.RPCCancelledByClient() |
249 | 0 | let status = error.makeGRPCStatus() |
250 | 0 | self.forwardToInterceptors(.end(status, [:])) |
251 | 0 | self.failBufferedWrites(with: error) |
252 | 0 | self.channel?.close(mode: .all, promise: nil) |
253 | 0 | self.channelPromise?.fail(error) |
254 | 0 | promise?.succeed(()) |
255 | 0 | } else { |
256 | 0 | promise?.succeed(()) |
257 | 0 | } |
258 | 0 | } |
259 | | } |
260 | | |
261 | | // MARK: - ChannelHandler API |
262 | | |
263 | | extension ClientTransport: ChannelInboundHandler { |
264 | | @usableFromInline |
265 | | typealias InboundIn = _RawGRPCClientResponsePart |
266 | | |
267 | | @usableFromInline |
268 | | typealias OutboundOut = _RawGRPCClientRequestPart |
269 | | |
270 | | @usableFromInline |
271 | 0 | internal func handlerRemoved(context: ChannelHandlerContext) { |
272 | 0 | self.dropReferences() |
273 | 0 | } |
274 | | |
275 | | @usableFromInline |
276 | 0 | internal func handlerAdded(context: ChannelHandlerContext) { |
277 | 0 | if context.channel.isActive { |
278 | 0 | self.transportActivated(channel: context.channel) |
279 | 0 | } |
280 | 0 | } |
281 | | |
282 | | @usableFromInline |
283 | 0 | internal func errorCaught(context: ChannelHandlerContext, error: Error) { |
284 | 0 | self.handleError(error) |
285 | 0 | } |
286 | | |
287 | | @usableFromInline |
288 | 0 | internal func channelActive(context: ChannelHandlerContext) { |
289 | 0 | self.transportActivated(channel: context.channel) |
290 | 0 | } |
291 | | |
292 | | @usableFromInline |
293 | 0 | internal func channelInactive(context: ChannelHandlerContext) { |
294 | 0 | self.transportDeactivated() |
295 | 0 | } |
296 | | |
297 | | @usableFromInline |
298 | 0 | internal func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
299 | 0 | switch self.unwrapInboundIn(data) { |
300 | 0 | case let .initialMetadata(headers): |
301 | 0 | self.receiveFromChannel(initialMetadata: headers) |
302 | 0 |
|
303 | 0 | case let .message(box): |
304 | 0 | self.receiveFromChannel(message: box.message) |
305 | 0 |
|
306 | 0 | case let .trailingMetadata(trailers): |
307 | 0 | self.receiveFromChannel(trailingMetadata: trailers) |
308 | 0 |
|
309 | 0 | case let .status(status): |
310 | 0 | self.receiveFromChannel(status: status) |
311 | 0 | } |
312 | 0 |
|
313 | 0 | // (We're the end of the channel. No need to forward anything.) |
314 | 0 | } |
315 | | } |
316 | | |
317 | | extension ClientTransport { |
318 | | /// The `Channel` became active. Send out any buffered requests. |
319 | 0 | private func transportActivated(channel: Channel) { |
320 | 0 | if self.callEventLoop.inEventLoop { |
321 | 0 | self._transportActivated(channel: channel) |
322 | 0 | } else { |
323 | 0 | self.callEventLoop.execute { |
324 | 0 | self._transportActivated(channel: channel) |
325 | 0 | } |
326 | 0 | } |
327 | 0 | } |
328 | | |
329 | | /// On-loop implementation of `transportActivated(channel:)`. |
330 | 0 | private func _transportActivated(channel: Channel) { |
331 | 0 | self.callEventLoop.assertInEventLoop() |
332 | 0 |
|
333 | 0 | switch self.state.activate() { |
334 | 0 | case .unbuffer: |
335 | 0 | self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress) |
336 | 0 | self._pipeline?.logger = self.logger |
337 | 0 | self.logger.debug("activated stream channel") |
338 | 0 | self.channel = channel |
339 | 0 | self.unbuffer() |
340 | 0 |
|
341 | 0 | case .close: |
342 | 0 | channel.close(mode: .all, promise: nil) |
343 | 0 |
|
344 | 0 | case .doNothing: |
345 | 0 | () |
346 | 0 | } |
347 | 0 | } |
348 | | |
349 | | /// The `Channel` became inactive. Fail any buffered writes and forward an error to the |
350 | | /// interceptor pipeline if necessary. |
351 | 0 | private func transportDeactivated() { |
352 | 0 | if self.callEventLoop.inEventLoop { |
353 | 0 | self._transportDeactivated() |
354 | 0 | } else { |
355 | 0 | self.callEventLoop.execute { |
356 | 0 | self._transportDeactivated() |
357 | 0 | } |
358 | 0 | } |
359 | 0 | } |
360 | | |
361 | | /// On-loop implementation of `transportDeactivated()`. |
362 | 0 | private func _transportDeactivated() { |
363 | 0 | self.callEventLoop.assertInEventLoop() |
364 | 0 | switch self.state.deactivate() { |
365 | 0 | case .doNothing: |
366 | 0 | () |
367 | 0 |
|
368 | 0 | case .tearDown: |
369 | 0 | let status = GRPCStatus(code: .unavailable, message: "Transport became inactive") |
370 | 0 | self.forwardErrorToInterceptors(status) |
371 | 0 | self.failBufferedWrites(with: status) |
372 | 0 | self.channelPromise?.fail(status) |
373 | 0 |
|
374 | 0 | case .failChannelPromise: |
375 | 0 | self.channelPromise?.fail(GRPCError.AlreadyComplete()) |
376 | 0 | } |
377 | 0 | } |
378 | | |
379 | | /// Drops any references to the `Channel` and interceptor pipeline. |
380 | 0 | private func dropReferences() { |
381 | 0 | if self.callEventLoop.inEventLoop { |
382 | 0 | self.channel = nil |
383 | 0 | } else { |
384 | 0 | self.callEventLoop.execute { |
385 | 0 | self.channel = nil |
386 | 0 | } |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | | /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the |
391 | | /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will |
392 | | /// also be closed. |
393 | 0 | internal func handleError(_ error: Error) { |
394 | 0 | if self.callEventLoop.inEventLoop { |
395 | 0 | self._handleError(error) |
396 | 0 | } else { |
397 | 0 | self.callEventLoop.execute { |
398 | 0 | self._handleError(error) |
399 | 0 | } |
400 | 0 | } |
401 | 0 | } |
402 | | |
403 | | /// On-loop implementation of `handleError(_:)`. |
404 | 0 | private func _handleError(_ error: Error) { |
405 | 0 | self.callEventLoop.assertInEventLoop() |
406 | 0 |
|
407 | 0 | switch self.state.handleError() { |
408 | 0 | case .doNothing: |
409 | 0 | () |
410 | 0 |
|
411 | 0 | case .propagateError: |
412 | 0 | self.forwardErrorToInterceptors(error) |
413 | 0 | self.failBufferedWrites(with: error) |
414 | 0 |
|
415 | 0 | case .propagateErrorAndClose: |
416 | 0 | self.forwardErrorToInterceptors(error) |
417 | 0 | self.failBufferedWrites(with: error) |
418 | 0 | self.channel?.close(mode: .all, promise: nil) |
419 | 0 | } |
420 | 0 | } |
421 | | |
422 | | /// Receive initial metadata from the `Channel`. |
423 | 0 | private func receiveFromChannel(initialMetadata headers: HPACKHeaders) { |
424 | 0 | if self.callEventLoop.inEventLoop { |
425 | 0 | self._receiveFromChannel(initialMetadata: headers) |
426 | 0 | } else { |
427 | 0 | self.callEventLoop.execute { |
428 | 0 | self._receiveFromChannel(initialMetadata: headers) |
429 | 0 | } |
430 | 0 | } |
431 | 0 | } |
432 | | |
433 | | /// On-loop implementation of `receiveFromChannel(initialMetadata:)`. |
434 | 0 | private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) { |
435 | 0 | self.callEventLoop.assertInEventLoop() |
436 | 0 | if self.state.channelRead(isEnd: false) { |
437 | 0 | self.forwardToInterceptors(.metadata(headers)) |
438 | 0 | } |
439 | 0 | } |
440 | | |
441 | | /// Receive response message bytes from the `Channel`. |
442 | 0 | private func receiveFromChannel(message buffer: ByteBuffer) { |
443 | 0 | if self.callEventLoop.inEventLoop { |
444 | 0 | self._receiveFromChannel(message: buffer) |
445 | 0 | } else { |
446 | 0 | self.callEventLoop.execute { |
447 | 0 | self._receiveFromChannel(message: buffer) |
448 | 0 | } |
449 | 0 | } |
450 | 0 | } |
451 | | |
452 | | /// On-loop implementation of `receiveFromChannel(message:)`. |
453 | 0 | private func _receiveFromChannel(message buffer: ByteBuffer) { |
454 | 0 | self.callEventLoop.assertInEventLoop() |
455 | 0 | do { |
456 | 0 | let message = try self.deserializer.deserialize(byteBuffer: buffer) |
457 | 0 | if self.state.channelRead(isEnd: false) { |
458 | 0 | self.forwardToInterceptors(.message(message)) |
459 | 0 | } |
460 | 0 | } catch { |
461 | 0 | self.handleError(error) |
462 | 0 | } |
463 | 0 | } |
464 | | |
465 | | /// Receive trailing metadata from the `Channel`. |
466 | 0 | private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) { |
467 | 0 | // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together |
468 | 0 | // in the interceptor pipeline. |
469 | 0 | if self.callEventLoop.inEventLoop { |
470 | 0 | self.trailers = trailers |
471 | 0 | } else { |
472 | 0 | self.callEventLoop.execute { |
473 | 0 | self.trailers = trailers |
474 | 0 | } |
475 | 0 | } |
476 | 0 | } |
477 | | |
478 | | /// Receive the final status from the `Channel`. |
479 | 0 | private func receiveFromChannel(status: GRPCStatus) { |
480 | 0 | if self.callEventLoop.inEventLoop { |
481 | 0 | self._receiveFromChannel(status: status) |
482 | 0 | } else { |
483 | 0 | self.callEventLoop.execute { |
484 | 0 | self._receiveFromChannel(status: status) |
485 | 0 | } |
486 | 0 | } |
487 | 0 | } |
488 | | |
489 | | /// On-loop implementation of `receiveFromChannel(status:)`. |
490 | 0 | private func _receiveFromChannel(status: GRPCStatus) { |
491 | 0 | self.callEventLoop.assertInEventLoop() |
492 | 0 | if self.state.channelRead(isEnd: true) { |
493 | 0 | self.forwardToInterceptors(.end(status, self.trailers ?? [:])) |
494 | 0 | self.trailers = nil |
495 | 0 | } |
496 | 0 | } |
497 | | } |
498 | | |
499 | | // MARK: - State Handling |
500 | | |
501 | | private enum ClientTransportState { |
502 | | /// Idle. We're waiting for the RPC to be configured. |
503 | | /// |
504 | | /// Valid transitions: |
505 | | /// - `awaitingTransport` (the transport is being configured) |
506 | | /// - `closed` (the RPC cancels) |
507 | | case idle |
508 | | |
509 | | /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to |
510 | | /// activate. We'll buffer any outbound messages from this state. Receiving messages from the |
511 | | /// transport in this state is an error. |
512 | | /// |
513 | | /// Valid transitions: |
514 | | /// - `activatingTransport` (the channel becomes active) |
515 | | /// - `closing` (the RPC cancels) |
516 | | /// - `closed` (the channel fails to become active) |
517 | | case awaitingTransport |
518 | | |
519 | | /// The transport is active but we're unbuffering any requests to write on that transport. |
520 | | /// We'll continue buffering in this state. Receiving messages from the transport in this state |
521 | | /// is okay. |
522 | | /// |
523 | | /// Valid transitions: |
524 | | /// - `active` (we finish unbuffering) |
525 | | /// - `closing` (the RPC cancels, the channel encounters an error) |
526 | | /// - `closed` (the channel becomes inactive) |
527 | | case activatingTransport |
528 | | |
529 | | /// Fully active. An RPC is in progress and is communicating over an active transport. |
530 | | /// |
531 | | /// Valid transitions: |
532 | | /// - `closing` (the RPC cancels, the channel encounters an error) |
533 | | /// - `closed` (the channel becomes inactive) |
534 | | case active |
535 | | |
536 | | /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't |
537 | | /// become inactive yet. |
538 | | /// |
539 | | /// Valid transitions: |
540 | | /// - `closed` (the channel becomes inactive) |
541 | | case closing |
542 | | |
543 | | /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will |
544 | | /// be ignored. |
545 | | /// |
546 | | /// Valid transitions: |
547 | | /// - none: this state is terminal. |
548 | | case closed |
549 | | |
550 | | /// Whether writes may be unbuffered in this state. |
551 | 0 | internal var isUnbuffering: Bool { |
552 | 0 | switch self { |
553 | 0 | case .activatingTransport: |
554 | 0 | return true |
555 | 0 | case .idle, .awaitingTransport, .active, .closing, .closed: |
556 | 0 | return false |
557 | 0 | } |
558 | 0 | } |
559 | | |
560 | | /// Whether this state allows writes to be buffered. (This is useful only to inform logging.) |
561 | 0 | internal var mayBuffer: Bool { |
562 | 0 | switch self { |
563 | 0 | case .idle, .activatingTransport, .awaitingTransport: |
564 | 0 | return true |
565 | 0 | case .active, .closing, .closed: |
566 | 0 | return false |
567 | 0 | } |
568 | 0 | } |
569 | | } |
570 | | |
571 | | extension ClientTransportState { |
572 | | /// The caller would like to configure the transport. Returns a boolean indicating whether we |
573 | | /// should configure it or not. |
574 | 0 | mutating func configureTransport() -> Bool { |
575 | 0 | switch self { |
576 | 0 | // We're idle until we configure. Anything else is just a repeat request to configure. |
577 | 0 | case .idle: |
578 | 0 | self = .awaitingTransport |
579 | 0 | return true |
580 | 0 |
|
581 | 0 | case .awaitingTransport, .activatingTransport, .active, .closing, .closed: |
582 | 0 | return false |
583 | 0 | } |
584 | 0 | } |
585 | | |
586 | | enum SendAction { |
587 | | /// Write the request into the buffer. |
588 | | case writeToBuffer |
589 | | /// Write the request into the channel. |
590 | | case writeToChannel |
591 | | /// The RPC has already completed, fail any promise associated with the write. |
592 | | case alreadyComplete |
593 | | } |
594 | | |
595 | | /// The pipeline would like to send a request part to the transport. |
596 | 0 | mutating func send() -> SendAction { |
597 | 0 | switch self { |
598 | 0 | // We don't have any transport yet, just buffer the part. |
599 | 0 | case .idle, .awaitingTransport, .activatingTransport: |
600 | 0 | return .writeToBuffer |
601 | 0 |
|
602 | 0 | // We have a `Channel`, we can pipe the write straight through. |
603 | 0 | case .active: |
604 | 0 | return .writeToChannel |
605 | 0 |
|
606 | 0 | // The transport is going or has gone away. Fail the promise. |
607 | 0 | case .closing, .closed: |
608 | 0 | return .alreadyComplete |
609 | 0 | } |
610 | 0 | } |
611 | | |
612 | | enum UnbufferedAction { |
613 | | /// Nothing needs to be done. |
614 | | case doNothing |
615 | | /// Succeed the channel promise associated with the transport. |
616 | | case succeedChannelPromise |
617 | | } |
618 | | |
619 | | /// We finished dealing with the buffered writes. |
620 | 0 | mutating func unbuffered() -> UnbufferedAction { |
621 | 0 | switch self { |
622 | 0 | // These can't happen since we only begin unbuffering when we transition to |
623 | 0 | // '.activatingTransport', which must come after these two states.. |
624 | 0 | case .idle, .awaitingTransport: |
625 | 0 | preconditionFailure("Requests can't be unbuffered before the transport is activated") |
626 | 0 |
|
627 | 0 | // We dealt with any buffered writes. We can become active now. This is the only way to become |
628 | 0 | // active. |
629 | 0 | case .activatingTransport: |
630 | 0 | self = .active |
631 | 0 | return .succeedChannelPromise |
632 | 0 |
|
633 | 0 | case .active: |
634 | 0 | preconditionFailure("Unbuffering completed but the transport is already active") |
635 | 0 |
|
636 | 0 | // Something caused us to close while unbuffering, that's okay, we won't take any further |
637 | 0 | // action. |
638 | 0 | case .closing, .closed: |
639 | 0 | return .doNothing |
640 | 0 | } |
641 | 0 | } |
642 | | |
643 | | /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether |
644 | | /// cancellation can go ahead (and also whether the channel should be torn down). |
645 | 0 | mutating func cancel() -> Bool { |
646 | 0 | switch self { |
647 | 0 | case .idle: |
648 | 0 | // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor |
649 | 0 | // we're done, fail any writes, and then deal with the cancellation promise. |
650 | 0 | self = .closed |
651 | 0 | return true |
652 | 0 |
|
653 | 0 | case .awaitingTransport: |
654 | 0 | // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as |
655 | 0 | // closing. We don't need to explicitly close the `Channel`, this will happen as a result of |
656 | 0 | // the `Channel` becoming active (see `channelActive(context:)`). |
657 | 0 | self = .closing |
658 | 0 | return true |
659 | 0 |
|
660 | 0 | case .activatingTransport: |
661 | 0 | // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll |
662 | 0 | // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail |
663 | 0 | // any buffered writes and then complete the cancellation promise. |
664 | 0 | self = .closing |
665 | 0 | return true |
666 | 0 |
|
667 | 0 | case .active: |
668 | 0 | // The RPC and channel are up and running. We'll fail the RPC and close the channel. |
669 | 0 | self = .closing |
670 | 0 | return true |
671 | 0 |
|
672 | 0 | case .closing, .closed: |
673 | 0 | // We're already closing or closing. The cancellation is too late. |
674 | 0 | return false |
675 | 0 | } |
676 | 0 | } |
677 | | |
678 | | enum ActivateAction { |
679 | | case unbuffer |
680 | | case close |
681 | | case doNothing |
682 | | } |
683 | | |
684 | | /// `channelActive` was invoked on the transport by the `Channel`. |
685 | 0 | mutating func activate() -> ActivateAction { |
686 | 0 | // The channel has become active: what now? |
687 | 0 | switch self { |
688 | 0 | case .idle: |
689 | 0 | preconditionFailure("Can't activate an idle transport") |
690 | 0 |
|
691 | 0 | case .awaitingTransport: |
692 | 0 | self = .activatingTransport |
693 | 0 | return .unbuffer |
694 | 0 |
|
695 | 0 | case .activatingTransport, .active: |
696 | 0 | // Already activated. |
697 | 0 | return .doNothing |
698 | 0 |
|
699 | 0 | case .closing: |
700 | 0 | // We remain in closing: we only transition to closed on 'channelInactive'. |
701 | 0 | return .close |
702 | 0 |
|
703 | 0 | case .closed: |
704 | 0 | preconditionFailure("Invalid state: stream is already inactive") |
705 | 0 | } |
706 | 0 | } |
707 | | |
708 | | enum ChannelInactiveAction { |
709 | | /// Tear down the transport; forward an error to the interceptors and fail any buffered writes. |
710 | | case tearDown |
711 | | /// Fail the 'Channel' promise, if one exists; the RPC is already complete. |
712 | | case failChannelPromise |
713 | | /// Do nothing. |
714 | | case doNothing |
715 | | } |
716 | | |
717 | | /// `channelInactive` was invoked on the transport by the `Channel`. |
718 | 0 | mutating func deactivate() -> ChannelInactiveAction { |
719 | 0 | switch self { |
720 | 0 | case .idle: |
721 | 0 | // We can't become inactive before we've requested a `Channel`. |
722 | 0 | preconditionFailure("Can't deactivate an idle transport") |
723 | 0 |
|
724 | 0 | case .awaitingTransport, .activatingTransport, .active: |
725 | 0 | // We're activating the transport - i.e. offloading any buffered requests - and the channel |
726 | 0 | // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should |
727 | 0 | // synthesize an error status to fail the RPC with. |
728 | 0 | self = .closed |
729 | 0 | return .tearDown |
730 | 0 |
|
731 | 0 | case .closing: |
732 | 0 | // We were already closing, now we're fully closed. |
733 | 0 | self = .closed |
734 | 0 | return .failChannelPromise |
735 | 0 |
|
736 | 0 | case .closed: |
737 | 0 | // We're already closed. |
738 | 0 | return .doNothing |
739 | 0 | } |
740 | 0 | } |
741 | | |
742 | | /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value |
743 | | /// indicating whether the part that was read should be forwarded to the interceptor pipeline. |
744 | 0 | mutating func channelRead(isEnd: Bool) -> Bool { |
745 | 0 | switch self { |
746 | 0 | case .idle, .awaitingTransport: |
747 | 0 | // If there's no `Channel` or the `Channel` isn't active, then we can't read anything. |
748 | 0 | preconditionFailure("Can't receive response part on idle transport") |
749 | 0 |
|
750 | 0 | case .activatingTransport, .active: |
751 | 0 | // We have an active `Channel`, we can forward the request part but we may need to start |
752 | 0 | // closing if we see the status, since it indicates the call is terminating. |
753 | 0 | if isEnd { |
754 | 0 | self = .closing |
755 | 0 | } |
756 | 0 | return true |
757 | 0 |
|
758 | 0 | case .closing, .closed: |
759 | 0 | // We closed early, ignore any reads. |
760 | 0 | return false |
761 | 0 | } |
762 | 0 | } |
763 | | |
764 | | enum HandleErrorAction { |
765 | | /// Propagate the error to the interceptor pipeline and fail any buffered writes. |
766 | | case propagateError |
767 | | /// As above, but close the 'Channel' as well. |
768 | | case propagateErrorAndClose |
769 | | /// No action is required. |
770 | | case doNothing |
771 | | } |
772 | | |
773 | | /// An error was caught. |
774 | 0 | mutating func handleError() -> HandleErrorAction { |
775 | 0 | switch self { |
776 | 0 | case .idle: |
777 | 0 | // The `Channel` can't error if it doesn't exist. |
778 | 0 | preconditionFailure("Can't catch error on idle transport") |
779 | 0 |
|
780 | 0 | case .awaitingTransport: |
781 | 0 | // We're waiting for the `Channel` to become active. We're toast now, so close, failing any |
782 | 0 | // buffered writes along the way. |
783 | 0 | self = .closing |
784 | 0 | return .propagateError |
785 | 0 |
|
786 | 0 | case .activatingTransport, |
787 | 0 | .active: |
788 | 0 | // We're either fully active or unbuffering. Forward an error, fail any writes and then close. |
789 | 0 | self = .closing |
790 | 0 | return .propagateErrorAndClose |
791 | 0 |
|
792 | 0 | case .closing, .closed: |
793 | 0 | // We're already closing/closed, we can ignore this. |
794 | 0 | return .doNothing |
795 | 0 | } |
796 | 0 | } |
797 | | |
798 | | enum GetChannelAction { |
799 | | /// No action is required. |
800 | | case doNothing |
801 | | /// Succeed the Channel promise. |
802 | | case succeed |
803 | | /// Fail the 'Channel' promise, the RPC is already complete. |
804 | | case fail |
805 | | } |
806 | | |
807 | | /// The caller has asked for the underlying `Channel`. |
808 | 0 | mutating func getChannel() -> GetChannelAction { |
809 | 0 | switch self { |
810 | 0 | case .idle, .awaitingTransport, .activatingTransport: |
811 | 0 | // Do nothing, we'll complete the promise when we become active or closed. |
812 | 0 | return .doNothing |
813 | 0 |
|
814 | 0 | case .active: |
815 | 0 | // We're already active, so there was no promise to succeed when we made this transition. We |
816 | 0 | // can complete it now. |
817 | 0 | return .succeed |
818 | 0 |
|
819 | 0 | case .closing: |
820 | 0 | // We'll complete the promise when we transition to closed. |
821 | 0 | return .doNothing |
822 | 0 |
|
823 | 0 | case .closed: |
824 | 0 | // We're already closed; there was no promise to fail when we made this transition. We can go |
825 | 0 | // ahead and fail it now though. |
826 | 0 | return .fail |
827 | 0 | } |
828 | 0 | } |
829 | | } |
830 | | |
831 | | // MARK: - State Actions |
832 | | |
833 | | extension ClientTransport { |
834 | | /// Configures this transport with the `configurator`. |
835 | 0 | private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) { |
836 | 0 | configurator(self).whenFailure { error in |
837 | 0 | // We might be on a different EL, but `handleError` will sort that out for us, so no need to |
838 | 0 | // hop. |
839 | 0 | if error is GRPCStatus || error is GRPCStatusTransformable { |
840 | 0 | self.handleError(error) |
841 | 0 | } else { |
842 | 0 | // Fallback to something which will mark the RPC as 'unavailable'. |
843 | 0 | self.handleError(ConnectionFailure(reason: error)) |
844 | 0 | } |
845 | 0 | } |
846 | 0 | } |
847 | | |
848 | | /// Append a request part to the write buffer. |
849 | | /// - Parameters: |
850 | | /// - part: The request part to buffer. |
851 | | /// - promise: A promise to complete when the request part has been sent. |
852 | | private func buffer( |
853 | | _ part: GRPCClientRequestPart<Request>, |
854 | | promise: EventLoopPromise<Void>? |
855 | 0 | ) { |
856 | 0 | self.callEventLoop.assertInEventLoop() |
857 | 0 | self.logger.trace( |
858 | 0 | "buffering request part", |
859 | 0 | metadata: [ |
860 | 0 | "request_part": "\(part.name)", |
861 | 0 | "call_state": self.stateForLogging, |
862 | 0 | ] |
863 | 0 | ) |
864 | 0 | self.writeBuffer.append(.init(request: part, promise: promise)) |
865 | 0 | } |
866 | | |
867 | | /// Writes any buffered request parts to the `Channel`. |
868 | 0 | private func unbuffer() { |
869 | 0 | self.callEventLoop.assertInEventLoop() |
870 | 0 |
|
871 | 0 | guard let channel = self.channel else { |
872 | 0 | return |
873 | 0 | } |
874 | 0 |
|
875 | 0 | // Save any flushing until we're done writing. |
876 | 0 | var shouldFlush = false |
877 | 0 |
|
878 | 0 | self.logger.trace( |
879 | 0 | "unbuffering request parts", |
880 | 0 | metadata: [ |
881 | 0 | "request_parts": "\(self.writeBuffer.count)" |
882 | 0 | ] |
883 | 0 | ) |
884 | 0 |
|
885 | 0 | // Why the double loop? A promise completed as a result of the flush may enqueue more writes, |
886 | 0 | // or causes us to change state (i.e. we may have to close). If we didn't loop around then we |
887 | 0 | // may miss more buffered writes. |
888 | 0 | while self.state.isUnbuffering, !self.writeBuffer.isEmpty { |
889 | 0 | // Pull out as many writes as possible. |
890 | 0 | while let write = self.writeBuffer.popFirst() { |
891 | 0 | self.logger.trace( |
892 | 0 | "unbuffering request part", |
893 | 0 | metadata: [ |
894 | 0 | "request_part": "\(write.request.name)" |
895 | 0 | ] |
896 | 0 | ) |
897 | 0 |
|
898 | 0 | if !shouldFlush { |
899 | 0 | shouldFlush = self.shouldFlush(after: write.request) |
900 | 0 | } |
901 | 0 |
|
902 | 0 | self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false) |
903 | 0 | } |
904 | 0 |
|
905 | 0 | // Okay, flush now. |
906 | 0 | if shouldFlush { |
907 | 0 | shouldFlush = false |
908 | 0 | channel.flush() |
909 | 0 | } |
910 | 0 | } |
911 | 0 |
|
912 | 0 | if self.writeBuffer.isEmpty { |
913 | 0 | self.logger.trace("request buffer drained") |
914 | 0 | } else { |
915 | 0 | self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging]) |
916 | 0 | } |
917 | 0 |
|
918 | 0 | // We're unbuffered. What now? |
919 | 0 | switch self.state.unbuffered() { |
920 | 0 | case .doNothing: |
921 | 0 | () |
922 | 0 | case .succeedChannelPromise: |
923 | 0 | self.channelPromise?.succeed(channel) |
924 | 0 | } |
925 | 0 | } |
926 | | |
927 | | /// Fails any promises that come with buffered writes with `error`. |
928 | | /// - Parameter error: The `Error` to fail promises with. |
929 | 0 | private func failBufferedWrites(with error: Error) { |
930 | 0 | self.logger.trace("failing buffered writes", metadata: ["call_state": self.stateForLogging]) |
931 | 0 |
|
932 | 0 | while let write = self.writeBuffer.popFirst() { |
933 | 0 | write.promise?.fail(error) |
934 | 0 | } |
935 | 0 | } |
936 | | |
937 | | /// Write a request part to the `Channel`. |
938 | | /// - Parameters: |
939 | | /// - channel: The `Channel` to write `part` to. |
940 | | /// - part: The request part to write. |
941 | | /// - promise: A promise to complete once the write has been completed. |
942 | | /// - flush: Whether to flush the `Channel` after writing. |
943 | | private func writeToChannel( |
944 | | _ channel: Channel, |
945 | | part: GRPCClientRequestPart<Request>, |
946 | | promise: EventLoopPromise<Void>?, |
947 | | flush: Bool |
948 | 0 | ) { |
949 | 0 | switch part { |
950 | 0 | case let .metadata(headers): |
951 | 0 | let head = self.makeRequestHead(with: headers) |
952 | 0 | channel.write(self.wrapOutboundOut(.head(head)), promise: promise) |
953 | 0 | // Messages are buffered by this class and in the async writer for async calls. Initially the |
954 | 0 | // async writer is not allowed to emit messages; the call to 'onStart()' signals that messages |
955 | 0 | // may be emitted. We call it here to avoid races between writing headers and messages. |
956 | 0 | self.onStart?() |
957 | 0 | self.onStart = nil |
958 | 0 |
|
959 | 0 | case let .message(request, metadata): |
960 | 0 | do { |
961 | 0 | let bytes = try self.serializer.serialize(request, allocator: channel.allocator) |
962 | 0 | let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress) |
963 | 0 | channel.write(self.wrapOutboundOut(.message(message)), promise: promise) |
964 | 0 | } catch { |
965 | 0 | self.handleError(error) |
966 | 0 | } |
967 | 0 |
|
968 | 0 | case .end: |
969 | 0 | channel.write(self.wrapOutboundOut(.end), promise: promise) |
970 | 0 | } |
971 | 0 |
|
972 | 0 | if flush { |
973 | 0 | channel.flush() |
974 | 0 | } |
975 | 0 | } |
976 | | |
977 | | /// Forward the response part to the interceptor pipeline. |
978 | | /// - Parameter part: The response part to forward. |
979 | 0 | private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) { |
980 | 0 | self.callEventLoop.assertInEventLoop() |
981 | 0 | self._pipeline?.receive(part) |
982 | 0 | } |
983 | | |
984 | | /// Forward the error to the interceptor pipeline. |
985 | | /// - Parameter error: The error to forward. |
986 | 0 | private func forwardErrorToInterceptors(_ error: Error) { |
987 | 0 | self.callEventLoop.assertInEventLoop() |
988 | 0 | self._pipeline?.errorCaught(error) |
989 | 0 | } |
990 | | } |
991 | | |
992 | | // MARK: - Helpers |
993 | | |
994 | | extension ClientTransport { |
995 | | /// Returns whether the `Channel` should be flushed after writing the given part to it. |
996 | 0 | private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool { |
997 | 0 | switch part { |
998 | 0 | case .metadata: |
999 | 0 | // If we're not streaming requests then we hold off on the flush until we see end. |
1000 | 0 | return self.isStreamingRequests |
1001 | 0 |
|
1002 | 0 | case let .message(_, metadata): |
1003 | 0 | // Message flushing is determined by caller preference. |
1004 | 0 | return metadata.flush |
1005 | 0 |
|
1006 | 0 | case .end: |
1007 | 0 | // Always flush at the end of the request stream. |
1008 | 0 | return true |
1009 | 0 | } |
1010 | 0 | } |
1011 | | |
1012 | | /// Make a `_GRPCRequestHead` with the provided metadata. |
1013 | 0 | private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead { |
1014 | 0 | return _GRPCRequestHead( |
1015 | 0 | method: self.callDetails.options.cacheable ? "GET" : "POST", |
1016 | 0 | scheme: self.callDetails.scheme, |
1017 | 0 | path: self.callDetails.path, |
1018 | 0 | host: self.callDetails.authority, |
1019 | 0 | deadline: self.callDetails.options.timeLimit.makeDeadline(), |
1020 | 0 | customMetadata: metadata, |
1021 | 0 | encoding: self.callDetails.options.messageEncoding |
1022 | 0 | ) |
1023 | 0 | } |
1024 | | } |
1025 | | |
1026 | | extension GRPCClientRequestPart { |
1027 | | /// The name of the request part, used for logging. |
1028 | 0 | fileprivate var name: String { |
1029 | 0 | switch self { |
1030 | 0 | case .metadata: |
1031 | 0 | return "metadata" |
1032 | 0 | case .message: |
1033 | 0 | return "message" |
1034 | 0 | case .end: |
1035 | 0 | return "end" |
1036 | 0 | } |
1037 | 0 | } |
1038 | | } |
1039 | | |
1040 | | // A wrapper for connection errors: we need to be able to preserve the underlying error as |
1041 | | // well as extract a 'GRPCStatus' with code '.unavailable'. |
1042 | | internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible { |
1043 | | /// The reason the connection failed. |
1044 | | var reason: Error |
1045 | | |
1046 | 0 | init(reason: Error) { |
1047 | 0 | self.reason = reason |
1048 | 0 | } |
1049 | | |
1050 | 0 | var description: String { |
1051 | 0 | return String(describing: self.reason) |
1052 | 0 | } |
1053 | | |
1054 | 0 | func makeGRPCStatus() -> GRPCStatus { |
1055 | 0 | return GRPCStatus( |
1056 | 0 | code: .unavailable, |
1057 | 0 | message: String(describing: self.reason), |
1058 | 0 | cause: self.reason |
1059 | 0 | ) |
1060 | 0 | } |
1061 | | } |