Coverage Report

Created: 2025-09-08 07:07

/src/grpc-swift/Sources/GRPC/Interceptor/ClientTransport.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP2
20
21
/// This class is the glue between a `NIO.Channel` and the `ClientInterceptorPipeline`. In fact
22
/// this object owns the interceptor pipeline and is also a `ChannelHandler`. The caller has very
23
/// little API to use on this class: they may configure the transport by adding it to a
24
/// `NIO.ChannelPipeline` with `configure(_:)`, send request parts via `send(_:promise:)` and
25
/// attempt to cancel the RPC with `cancel(promise:)`. Response parts – after traversing the
26
/// interceptor pipeline – are emitted to the `onResponsePart` callback supplied to the initializer.
27
///
28
/// In most instances the glue code is simple: transformations are applied to the request and
29
/// response types used by the interceptor pipeline and the `NIO.Channel`. In addition, the
30
/// transport keeps track of the state of the call and the `Channel`, taking appropriate action
31
/// when these change. This includes buffering request parts from the interceptor pipeline until
32
/// the `NIO.Channel` becomes active.
33
///
34
/// ### Thread Safety
35
///
36
/// This class is not thread safe. All methods **must** be executed on the transport's `callEventLoop`.
37
@usableFromInline
38
internal final class ClientTransport<Request, Response> {
39
  /// The `EventLoop` the call is running on. State must be accessed from this event loop.
40
  @usableFromInline
41
  internal let callEventLoop: EventLoop
42
43
  /// The current state of the transport.
44
0
  private var state: ClientTransportState = .idle
45
46
  /// A promise for the underlying `Channel`. We'll succeed this when we transition to `active`
47
  /// and fail it when we transition to `closed`.
48
  private var channelPromise: EventLoopPromise<Channel>?
49
50
  // Note: initial capacity is 4 because it's a power of 2 and most calls are unary so will
51
  // have 3 parts.
52
  /// A buffer to store request parts and promises in before the channel has become active.
53
0
  private var writeBuffer = MarkedCircularBuffer<RequestAndPromise>(initialCapacity: 4)
54
55
  /// The request serializer.
56
  private let serializer: AnySerializer<Request>
57
58
  /// The response deserializer.
59
  private let deserializer: AnyDeserializer<Response>
60
61
  /// A request part and a promise.
62
  private struct RequestAndPromise {
63
    var request: GRPCClientRequestPart<Request>
64
    var promise: EventLoopPromise<Void>?
65
  }
66
67
  /// Details about the call.
68
  internal let callDetails: CallDetails
69
70
  /// A logger.
71
  internal var logger: Logger
72
73
  /// Is the call streaming requests?
74
0
  private var isStreamingRequests: Bool {
75
0
    switch self.callDetails.type {
76
0
    case .unary, .serverStreaming:
77
0
      return false
78
0
    case .clientStreaming, .bidirectionalStreaming:
79
0
      return true
80
0
    }
81
0
  }
82
83
  // Our `NIO.Channel` will fire trailers and the `GRPCStatus` to us separately. It's more
84
  // convenient to have both at the same time when intercepting response parts. We'll hold on to the
85
  // trailers here and only forward them when we receive the status.
86
  private var trailers: HPACKHeaders?
87
88
  /// The interceptor pipeline connected to this transport. The pipeline also holds references
89
  /// to `self` which are dropped when the interceptor pipeline is closed.
90
  @usableFromInline
91
  internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
92
93
  /// The `NIO.Channel` used by the transport, if it is available.
94
  private var channel: Channel?
95
96
  /// A callback which is invoked once when the stream channel becomes active.
97
  private var onStart: (() -> Void)?
98
99
  /// Our current state as logging metadata.
100
0
  private var stateForLogging: Logger.MetadataValue {
101
0
    if self.state.mayBuffer {
102
0
      return "\(self.state) (\(self.writeBuffer.count) parts buffered)"
103
0
    } else {
104
0
      return "\(self.state)"
105
0
    }
106
0
  }
107
108
  internal init(
109
    details: CallDetails,
110
    eventLoop: EventLoop,
111
    interceptors: [ClientInterceptor<Request, Response>],
112
    serializer: AnySerializer<Request>,
113
    deserializer: AnyDeserializer<Response>,
114
    errorDelegate: ClientErrorDelegate?,
115
    onStart: @escaping () -> Void,
116
    onError: @escaping (Error) -> Void,
117
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
118
0
  ) {
119
0
    self.callEventLoop = eventLoop
120
0
    self.callDetails = details
121
0
    self.onStart = onStart
122
0
    self.logger = details.options.logger
123
0
    self.serializer = serializer
124
0
    self.deserializer = deserializer
125
0
    // The references to self held by the pipeline are dropped when it is closed.
126
0
    self._pipeline = ClientInterceptorPipeline(
127
0
      eventLoop: eventLoop,
128
0
      details: details,
129
0
      logger: details.options.logger,
130
0
      interceptors: interceptors,
131
0
      errorDelegate: errorDelegate,
132
0
      onError: onError,
133
0
      onCancel: self.cancelFromPipeline(promise:),
Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAP0tF7PromiseVyytGSgcAMcfu_
Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAP0tF7PromiseVyytGSgcAMcfu_yA9_cfu0_
134
0
      onRequestPart: self.sendFromPipeline(_:promise:),
Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAA0y7RequestP0OyxG_AP0tF7PromiseVyytGSgtcAMcfu1_
Unexecuted instantiation: $s4GRPC15ClientTransportC7details9eventLoop12interceptors10serializer12deserializer13errorDelegate7onStart0L5Error0L12ResponsePartACyxq_GAA11CallDetailsV_7NIOCore05EventF0_pSayAA0B11InterceptorCyxq_GGAA13AnySerializerVyxGAA0V12DeserializerVyq_GAA0bnK0_pSgyycys0N0_pcyAA010GRPCClientoP0Oyq_GctcfcyAA0y7RequestP0OyxG_AP0tF7PromiseVyytGSgtcAMcfu1_yA8__A12_tcfu2_
135
0
      onResponsePart: onResponsePart
136
0
    )
137
0
  }
138
139
  // MARK: - Call Object API
140
141
  /// Configure the transport to communicate with the server.
142
  /// - Parameter configurator: A callback to invoke in order to configure this transport.
143
  /// - Important: This *must* to be called from the `callEventLoop`.
144
0
  internal func configure(_ configurator: @escaping (ChannelHandler) -> EventLoopFuture<Void>) {
145
0
    self.callEventLoop.assertInEventLoop()
146
0
    if self.state.configureTransport() {
147
0
      self.configure(using: configurator)
148
0
    }
149
0
  }
150
151
  /// Send a request part – via the interceptor pipeline – to the server.
152
  /// - Parameters:
153
  ///   - part: The part to send.
154
  ///   - promise: A promise which will be completed when the request part has been handled.
155
  /// - Important: This *must* to be called from the `callEventLoop`.
156
  @inlinable
157
0
  internal func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
158
0
    self.callEventLoop.assertInEventLoop()
159
0
    if let pipeline = self._pipeline {
160
0
      pipeline.send(part, promise: promise)
161
0
    } else {
162
0
      promise?.fail(GRPCError.AlreadyComplete())
163
0
    }
164
0
  }
165
166
  /// Attempt to cancel the RPC notifying any interceptors.
167
  /// - Parameter promise: A promise which will be completed when the cancellation attempt has
168
  ///   been handled.
169
0
  internal func cancel(promise: EventLoopPromise<Void>?) {
170
0
    self.callEventLoop.assertInEventLoop()
171
0
    if let pipeline = self._pipeline {
172
0
      pipeline.cancel(promise: promise)
173
0
    } else {
174
0
      promise?.fail(GRPCError.AlreadyComplete())
175
0
    }
176
0
  }
177
178
  /// A request for the underlying `Channel`.
179
0
  internal func getChannel() -> EventLoopFuture<Channel> {
180
0
    self.callEventLoop.assertInEventLoop()
181
0
182
0
    // Do we already have a promise?
183
0
    if let promise = self.channelPromise {
184
0
      return promise.futureResult
185
0
    } else {
186
0
      // Make and store the promise.
187
0
      let promise = self.callEventLoop.makePromise(of: Channel.self)
188
0
      self.channelPromise = promise
189
0
190
0
      // Ask the state machine if we can have it.
191
0
      switch self.state.getChannel() {
192
0
      case .succeed:
193
0
        if let channel = self.channel {
194
0
          promise.succeed(channel)
195
0
        }
196
0
197
0
      case .fail:
198
0
        promise.fail(GRPCError.AlreadyComplete())
199
0
200
0
      case .doNothing:
201
0
        ()
202
0
      }
203
0
204
0
      return promise.futureResult
205
0
    }
206
0
  }
207
}
208
209
// MARK: - Pipeline API
210
211
extension ClientTransport {
212
  /// Sends a request part on the transport. Should only be called from the interceptor pipeline.
213
  /// - Parameters:
214
  ///   - part: The request part to send.
215
  ///   - promise: A promise which will be completed when the part has been handled.
216
  /// - Important: This *must* to be called from the `callEventLoop`.
217
  private func sendFromPipeline(
218
    _ part: GRPCClientRequestPart<Request>,
219
    promise: EventLoopPromise<Void>?
220
0
  ) {
221
0
    self.callEventLoop.assertInEventLoop()
222
0
    switch self.state.send() {
223
0
    case .writeToBuffer:
224
0
      self.buffer(part, promise: promise)
225
0
226
0
    case .writeToChannel:
227
0
      // Banging the channel is okay here: we'll only be told to 'writeToChannel' if we're in the
228
0
      // correct state, the requirements of that state are having an active `Channel`.
229
0
      self.writeToChannel(
230
0
        self.channel!,
231
0
        part: part,
232
0
        promise: promise,
233
0
        flush: self.shouldFlush(after: part)
234
0
      )
235
0
236
0
    case .alreadyComplete:
237
0
      promise?.fail(GRPCError.AlreadyComplete())
238
0
    }
239
0
  }
240
241
  /// Attempt to cancel the RPC. Should only be called from the interceptor pipeline.
242
  /// - Parameter promise: A promise which will be completed when the cancellation has been handled.
243
  /// - Important: This *must* to be called from the `callEventLoop`.
244
0
  private func cancelFromPipeline(promise: EventLoopPromise<Void>?) {
245
0
    self.callEventLoop.assertInEventLoop()
246
0
247
0
    if self.state.cancel() {
248
0
      let error = GRPCError.RPCCancelledByClient()
249
0
      let status = error.makeGRPCStatus()
250
0
      self.forwardToInterceptors(.end(status, [:]))
251
0
      self.failBufferedWrites(with: error)
252
0
      self.channel?.close(mode: .all, promise: nil)
253
0
      self.channelPromise?.fail(error)
254
0
      promise?.succeed(())
255
0
    } else {
256
0
      promise?.succeed(())
257
0
    }
258
0
  }
259
}
260
261
// MARK: - ChannelHandler API
262
263
extension ClientTransport: ChannelInboundHandler {
264
  @usableFromInline
265
  typealias InboundIn = _RawGRPCClientResponsePart
266
267
  @usableFromInline
268
  typealias OutboundOut = _RawGRPCClientRequestPart
269
270
  @usableFromInline
271
0
  internal func handlerRemoved(context: ChannelHandlerContext) {
272
0
    self.dropReferences()
273
0
  }
274
275
  @usableFromInline
276
0
  internal func handlerAdded(context: ChannelHandlerContext) {
277
0
    if context.channel.isActive {
278
0
      self.transportActivated(channel: context.channel)
279
0
    }
280
0
  }
281
282
  @usableFromInline
283
0
  internal func errorCaught(context: ChannelHandlerContext, error: Error) {
284
0
    self.handleError(error)
285
0
  }
286
287
  @usableFromInline
288
0
  internal func channelActive(context: ChannelHandlerContext) {
289
0
    self.transportActivated(channel: context.channel)
290
0
  }
291
292
  @usableFromInline
293
0
  internal func channelInactive(context: ChannelHandlerContext) {
294
0
    self.transportDeactivated()
295
0
  }
296
297
  @usableFromInline
298
0
  internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
299
0
    switch self.unwrapInboundIn(data) {
300
0
    case let .initialMetadata(headers):
301
0
      self.receiveFromChannel(initialMetadata: headers)
302
0
303
0
    case let .message(box):
304
0
      self.receiveFromChannel(message: box.message)
305
0
306
0
    case let .trailingMetadata(trailers):
307
0
      self.receiveFromChannel(trailingMetadata: trailers)
308
0
309
0
    case let .status(status):
310
0
      self.receiveFromChannel(status: status)
311
0
    }
312
0
313
0
    // (We're the end of the channel. No need to forward anything.)
314
0
  }
315
}
316
317
extension ClientTransport {
318
  /// The `Channel` became active. Send out any buffered requests.
319
0
  private func transportActivated(channel: Channel) {
320
0
    if self.callEventLoop.inEventLoop {
321
0
      self._transportActivated(channel: channel)
322
0
    } else {
323
0
      self.callEventLoop.execute {
324
0
        self._transportActivated(channel: channel)
325
0
      }
326
0
    }
327
0
  }
328
329
  /// On-loop implementation of `transportActivated(channel:)`.
330
0
  private func _transportActivated(channel: Channel) {
331
0
    self.callEventLoop.assertInEventLoop()
332
0
333
0
    switch self.state.activate() {
334
0
    case .unbuffer:
335
0
      self.logger.addIPAddressMetadata(local: channel.localAddress, remote: channel.remoteAddress)
336
0
      self._pipeline?.logger = self.logger
337
0
      self.logger.debug("activated stream channel")
338
0
      self.channel = channel
339
0
      self.unbuffer()
340
0
341
0
    case .close:
342
0
      channel.close(mode: .all, promise: nil)
343
0
344
0
    case .doNothing:
345
0
      ()
346
0
    }
347
0
  }
348
349
  /// The `Channel` became inactive. Fail any buffered writes and forward an error to the
350
  /// interceptor pipeline if necessary.
351
0
  private func transportDeactivated() {
352
0
    if self.callEventLoop.inEventLoop {
353
0
      self._transportDeactivated()
354
0
    } else {
355
0
      self.callEventLoop.execute {
356
0
        self._transportDeactivated()
357
0
      }
358
0
    }
359
0
  }
360
361
  /// On-loop implementation of `transportDeactivated()`.
362
0
  private func _transportDeactivated() {
363
0
    self.callEventLoop.assertInEventLoop()
364
0
    switch self.state.deactivate() {
365
0
    case .doNothing:
366
0
      ()
367
0
368
0
    case .tearDown:
369
0
      let status = GRPCStatus(code: .unavailable, message: "Transport became inactive")
370
0
      self.forwardErrorToInterceptors(status)
371
0
      self.failBufferedWrites(with: status)
372
0
      self.channelPromise?.fail(status)
373
0
374
0
    case .failChannelPromise:
375
0
      self.channelPromise?.fail(GRPCError.AlreadyComplete())
376
0
    }
377
0
  }
378
379
  /// Drops any references to the `Channel` and interceptor pipeline.
380
0
  private func dropReferences() {
381
0
    if self.callEventLoop.inEventLoop {
382
0
      self.channel = nil
383
0
    } else {
384
0
      self.callEventLoop.execute {
385
0
        self.channel = nil
386
0
      }
387
0
    }
388
0
  }
389
390
  /// Handles an error caught in the pipeline or from elsewhere. The error may be forwarded to the
391
  /// interceptor pipeline and any buffered writes will be failed. Any underlying `Channel` will
392
  /// also be closed.
393
0
  internal func handleError(_ error: Error) {
394
0
    if self.callEventLoop.inEventLoop {
395
0
      self._handleError(error)
396
0
    } else {
397
0
      self.callEventLoop.execute {
398
0
        self._handleError(error)
399
0
      }
400
0
    }
401
0
  }
402
403
  /// On-loop implementation of `handleError(_:)`.
404
0
  private func _handleError(_ error: Error) {
405
0
    self.callEventLoop.assertInEventLoop()
406
0
407
0
    switch self.state.handleError() {
408
0
    case .doNothing:
409
0
      ()
410
0
411
0
    case .propagateError:
412
0
      self.forwardErrorToInterceptors(error)
413
0
      self.failBufferedWrites(with: error)
414
0
415
0
    case .propagateErrorAndClose:
416
0
      self.forwardErrorToInterceptors(error)
417
0
      self.failBufferedWrites(with: error)
418
0
      self.channel?.close(mode: .all, promise: nil)
419
0
    }
420
0
  }
421
422
  /// Receive initial metadata from the `Channel`.
423
0
  private func receiveFromChannel(initialMetadata headers: HPACKHeaders) {
424
0
    if self.callEventLoop.inEventLoop {
425
0
      self._receiveFromChannel(initialMetadata: headers)
426
0
    } else {
427
0
      self.callEventLoop.execute {
428
0
        self._receiveFromChannel(initialMetadata: headers)
429
0
      }
430
0
    }
431
0
  }
432
433
  /// On-loop implementation of `receiveFromChannel(initialMetadata:)`.
434
0
  private func _receiveFromChannel(initialMetadata headers: HPACKHeaders) {
435
0
    self.callEventLoop.assertInEventLoop()
436
0
    if self.state.channelRead(isEnd: false) {
437
0
      self.forwardToInterceptors(.metadata(headers))
438
0
    }
439
0
  }
440
441
  /// Receive response message bytes from the `Channel`.
442
0
  private func receiveFromChannel(message buffer: ByteBuffer) {
443
0
    if self.callEventLoop.inEventLoop {
444
0
      self._receiveFromChannel(message: buffer)
445
0
    } else {
446
0
      self.callEventLoop.execute {
447
0
        self._receiveFromChannel(message: buffer)
448
0
      }
449
0
    }
450
0
  }
451
452
  /// On-loop implementation of `receiveFromChannel(message:)`.
453
0
  private func _receiveFromChannel(message buffer: ByteBuffer) {
454
0
    self.callEventLoop.assertInEventLoop()
455
0
    do {
456
0
      let message = try self.deserializer.deserialize(byteBuffer: buffer)
457
0
      if self.state.channelRead(isEnd: false) {
458
0
        self.forwardToInterceptors(.message(message))
459
0
      }
460
0
    } catch {
461
0
      self.handleError(error)
462
0
    }
463
0
  }
464
465
  /// Receive trailing metadata from the `Channel`.
466
0
  private func receiveFromChannel(trailingMetadata trailers: HPACKHeaders) {
467
0
    // The `Channel` delivers trailers and `GRPCStatus` separately, we want to emit them together
468
0
    // in the interceptor pipeline.
469
0
    if self.callEventLoop.inEventLoop {
470
0
      self.trailers = trailers
471
0
    } else {
472
0
      self.callEventLoop.execute {
473
0
        self.trailers = trailers
474
0
      }
475
0
    }
476
0
  }
477
478
  /// Receive the final status from the `Channel`.
479
0
  private func receiveFromChannel(status: GRPCStatus) {
480
0
    if self.callEventLoop.inEventLoop {
481
0
      self._receiveFromChannel(status: status)
482
0
    } else {
483
0
      self.callEventLoop.execute {
484
0
        self._receiveFromChannel(status: status)
485
0
      }
486
0
    }
487
0
  }
488
489
  /// On-loop implementation of `receiveFromChannel(status:)`.
490
0
  private func _receiveFromChannel(status: GRPCStatus) {
491
0
    self.callEventLoop.assertInEventLoop()
492
0
    if self.state.channelRead(isEnd: true) {
493
0
      self.forwardToInterceptors(.end(status, self.trailers ?? [:]))
494
0
      self.trailers = nil
495
0
    }
496
0
  }
497
}
498
499
// MARK: - State Handling
500
501
private enum ClientTransportState {
502
  /// Idle. We're waiting for the RPC to be configured.
503
  ///
504
  /// Valid transitions:
505
  /// - `awaitingTransport` (the transport is being configured)
506
  /// - `closed` (the RPC cancels)
507
  case idle
508
509
  /// Awaiting transport. The RPC has requested transport and we're waiting for that transport to
510
  /// activate. We'll buffer any outbound messages from this state. Receiving messages from the
511
  /// transport in this state is an error.
512
  ///
513
  /// Valid transitions:
514
  /// - `activatingTransport` (the channel becomes active)
515
  /// - `closing` (the RPC cancels)
516
  /// - `closed` (the channel fails to become active)
517
  case awaitingTransport
518
519
  /// The transport is active but we're unbuffering any requests to write on that transport.
520
  /// We'll continue buffering in this state. Receiving messages from the transport in this state
521
  /// is okay.
522
  ///
523
  /// Valid transitions:
524
  /// - `active` (we finish unbuffering)
525
  /// - `closing` (the RPC cancels, the channel encounters an error)
526
  /// - `closed` (the channel becomes inactive)
527
  case activatingTransport
528
529
  /// Fully active. An RPC is in progress and is communicating over an active transport.
530
  ///
531
  /// Valid transitions:
532
  /// - `closing` (the RPC cancels, the channel encounters an error)
533
  /// - `closed` (the channel becomes inactive)
534
  case active
535
536
  /// Closing. Either the RPC was cancelled or any `Channel` associated with the transport hasn't
537
  /// become inactive yet.
538
  ///
539
  /// Valid transitions:
540
  /// - `closed` (the channel becomes inactive)
541
  case closing
542
543
  /// We're closed. Any writes from the RPC will be failed. Any responses from the transport will
544
  /// be ignored.
545
  ///
546
  /// Valid transitions:
547
  /// - none: this state is terminal.
548
  case closed
549
550
  /// Whether writes may be unbuffered in this state.
551
0
  internal var isUnbuffering: Bool {
552
0
    switch self {
553
0
    case .activatingTransport:
554
0
      return true
555
0
    case .idle, .awaitingTransport, .active, .closing, .closed:
556
0
      return false
557
0
    }
558
0
  }
559
560
  /// Whether this state allows writes to be buffered. (This is useful only to inform logging.)
561
0
  internal var mayBuffer: Bool {
562
0
    switch self {
563
0
    case .idle, .activatingTransport, .awaitingTransport:
564
0
      return true
565
0
    case .active, .closing, .closed:
566
0
      return false
567
0
    }
568
0
  }
569
}
570
571
extension ClientTransportState {
572
  /// The caller would like to configure the transport. Returns a boolean indicating whether we
573
  /// should configure it or not.
574
0
  mutating func configureTransport() -> Bool {
575
0
    switch self {
576
0
    // We're idle until we configure. Anything else is just a repeat request to configure.
577
0
    case .idle:
578
0
      self = .awaitingTransport
579
0
      return true
580
0
581
0
    case .awaitingTransport, .activatingTransport, .active, .closing, .closed:
582
0
      return false
583
0
    }
584
0
  }
585
586
  enum SendAction {
587
    /// Write the request into the buffer.
588
    case writeToBuffer
589
    /// Write the request into the channel.
590
    case writeToChannel
591
    /// The RPC has already completed, fail any promise associated with the write.
592
    case alreadyComplete
593
  }
594
595
  /// The pipeline would like to send a request part to the transport.
596
0
  mutating func send() -> SendAction {
597
0
    switch self {
598
0
    // We don't have any transport yet, just buffer the part.
599
0
    case .idle, .awaitingTransport, .activatingTransport:
600
0
      return .writeToBuffer
601
0
602
0
    // We have a `Channel`, we can pipe the write straight through.
603
0
    case .active:
604
0
      return .writeToChannel
605
0
606
0
    // The transport is going or has gone away. Fail the promise.
607
0
    case .closing, .closed:
608
0
      return .alreadyComplete
609
0
    }
610
0
  }
611
612
  enum UnbufferedAction {
613
    /// Nothing needs to be done.
614
    case doNothing
615
    /// Succeed the channel promise associated with the transport.
616
    case succeedChannelPromise
617
  }
618
619
  /// We finished dealing with the buffered writes.
620
0
  mutating func unbuffered() -> UnbufferedAction {
621
0
    switch self {
622
0
    // These can't happen since we only begin unbuffering when we transition to
623
0
    // '.activatingTransport', which must come after these two states..
624
0
    case .idle, .awaitingTransport:
625
0
      preconditionFailure("Requests can't be unbuffered before the transport is activated")
626
0
627
0
    // We dealt with any buffered writes. We can become active now. This is the only way to become
628
0
    // active.
629
0
    case .activatingTransport:
630
0
      self = .active
631
0
      return .succeedChannelPromise
632
0
633
0
    case .active:
634
0
      preconditionFailure("Unbuffering completed but the transport is already active")
635
0
636
0
    // Something caused us to close while unbuffering, that's okay, we won't take any further
637
0
    // action.
638
0
    case .closing, .closed:
639
0
      return .doNothing
640
0
    }
641
0
  }
642
643
  /// Cancel the RPC and associated `Channel`, if possible. Returns a boolean indicated whether
644
  /// cancellation can go ahead (and also whether the channel should be torn down).
645
0
  mutating func cancel() -> Bool {
646
0
    switch self {
647
0
    case .idle:
648
0
      // No RPC has been started and we don't have a `Channel`. We need to tell the interceptor
649
0
      // we're done, fail any writes, and then deal with the cancellation promise.
650
0
      self = .closed
651
0
      return true
652
0
653
0
    case .awaitingTransport:
654
0
      // An RPC has started and we're waiting for the `Channel` to activate. We'll mark ourselves as
655
0
      // closing. We don't need to explicitly close the `Channel`, this will happen as a result of
656
0
      // the `Channel` becoming active (see `channelActive(context:)`).
657
0
      self = .closing
658
0
      return true
659
0
660
0
    case .activatingTransport:
661
0
      // The RPC has started, the `Channel` is active and we're emptying our write buffer. We'll
662
0
      // mark ourselves as closing: we'll error the interceptor pipeline, close the channel, fail
663
0
      // any buffered writes and then complete the cancellation promise.
664
0
      self = .closing
665
0
      return true
666
0
667
0
    case .active:
668
0
      // The RPC and channel are up and running. We'll fail the RPC and close the channel.
669
0
      self = .closing
670
0
      return true
671
0
672
0
    case .closing, .closed:
673
0
      // We're already closing or closing. The cancellation is too late.
674
0
      return false
675
0
    }
676
0
  }
677
678
  enum ActivateAction {
679
    case unbuffer
680
    case close
681
    case doNothing
682
  }
683
684
  /// `channelActive` was invoked on the transport by the `Channel`.
685
0
  mutating func activate() -> ActivateAction {
686
0
    // The channel has become active: what now?
687
0
    switch self {
688
0
    case .idle:
689
0
      preconditionFailure("Can't activate an idle transport")
690
0
691
0
    case .awaitingTransport:
692
0
      self = .activatingTransport
693
0
      return .unbuffer
694
0
695
0
    case .activatingTransport, .active:
696
0
      // Already activated.
697
0
      return .doNothing
698
0
699
0
    case .closing:
700
0
      // We remain in closing: we only transition to closed on 'channelInactive'.
701
0
      return .close
702
0
703
0
    case .closed:
704
0
      preconditionFailure("Invalid state: stream is already inactive")
705
0
    }
706
0
  }
707
708
  enum ChannelInactiveAction {
709
    /// Tear down the transport; forward an error to the interceptors and fail any buffered writes.
710
    case tearDown
711
    /// Fail the 'Channel' promise, if one exists; the RPC is already complete.
712
    case failChannelPromise
713
    /// Do nothing.
714
    case doNothing
715
  }
716
717
  /// `channelInactive` was invoked on the transport by the `Channel`.
718
0
  mutating func deactivate() -> ChannelInactiveAction {
719
0
    switch self {
720
0
    case .idle:
721
0
      // We can't become inactive before we've requested a `Channel`.
722
0
      preconditionFailure("Can't deactivate an idle transport")
723
0
724
0
    case .awaitingTransport, .activatingTransport, .active:
725
0
      // We're activating the transport - i.e. offloading any buffered requests - and the channel
726
0
      // became inactive. We haven't received an error (otherwise we'd be `closing`) so we should
727
0
      // synthesize an error status to fail the RPC with.
728
0
      self = .closed
729
0
      return .tearDown
730
0
731
0
    case .closing:
732
0
      // We were already closing, now we're fully closed.
733
0
      self = .closed
734
0
      return .failChannelPromise
735
0
736
0
    case .closed:
737
0
      // We're already closed.
738
0
      return .doNothing
739
0
    }
740
0
  }
741
742
  /// `channelRead` was invoked on the transport by the `Channel`. Returns a boolean value
743
  /// indicating whether the part that was read should be forwarded to the interceptor pipeline.
744
0
  mutating func channelRead(isEnd: Bool) -> Bool {
745
0
    switch self {
746
0
    case .idle, .awaitingTransport:
747
0
      // If there's no `Channel` or the `Channel` isn't active, then we can't read anything.
748
0
      preconditionFailure("Can't receive response part on idle transport")
749
0
750
0
    case .activatingTransport, .active:
751
0
      // We have an active `Channel`, we can forward the request part but we may need to start
752
0
      // closing if we see the status, since it indicates the call is terminating.
753
0
      if isEnd {
754
0
        self = .closing
755
0
      }
756
0
      return true
757
0
758
0
    case .closing, .closed:
759
0
      // We closed early, ignore any reads.
760
0
      return false
761
0
    }
762
0
  }
763
764
  enum HandleErrorAction {
765
    /// Propagate the error to the interceptor pipeline and fail any buffered writes.
766
    case propagateError
767
    /// As above, but close the 'Channel' as well.
768
    case propagateErrorAndClose
769
    /// No action is required.
770
    case doNothing
771
  }
772
773
  /// An error was caught.
774
0
  mutating func handleError() -> HandleErrorAction {
775
0
    switch self {
776
0
    case .idle:
777
0
      // The `Channel` can't error if it doesn't exist.
778
0
      preconditionFailure("Can't catch error on idle transport")
779
0
780
0
    case .awaitingTransport:
781
0
      // We're waiting for the `Channel` to become active. We're toast now, so close, failing any
782
0
      // buffered writes along the way.
783
0
      self = .closing
784
0
      return .propagateError
785
0
786
0
    case .activatingTransport,
787
0
      .active:
788
0
      // We're either fully active or unbuffering. Forward an error, fail any writes and then close.
789
0
      self = .closing
790
0
      return .propagateErrorAndClose
791
0
792
0
    case .closing, .closed:
793
0
      // We're already closing/closed, we can ignore this.
794
0
      return .doNothing
795
0
    }
796
0
  }
797
798
  enum GetChannelAction {
799
    /// No action is required.
800
    case doNothing
801
    /// Succeed the Channel promise.
802
    case succeed
803
    /// Fail the 'Channel' promise, the RPC is already complete.
804
    case fail
805
  }
806
807
  /// The caller has asked for the underlying `Channel`.
808
0
  mutating func getChannel() -> GetChannelAction {
809
0
    switch self {
810
0
    case .idle, .awaitingTransport, .activatingTransport:
811
0
      // Do nothing, we'll complete the promise when we become active or closed.
812
0
      return .doNothing
813
0
814
0
    case .active:
815
0
      // We're already active, so there was no promise to succeed when we made this transition. We
816
0
      // can complete it now.
817
0
      return .succeed
818
0
819
0
    case .closing:
820
0
      // We'll complete the promise when we transition to closed.
821
0
      return .doNothing
822
0
823
0
    case .closed:
824
0
      // We're already closed; there was no promise to fail when we made this transition. We can go
825
0
      // ahead and fail it now though.
826
0
      return .fail
827
0
    }
828
0
  }
829
}
830
831
// MARK: - State Actions
832
833
extension ClientTransport {
834
  /// Configures this transport with the `configurator`.
835
0
  private func configure(using configurator: (ChannelHandler) -> EventLoopFuture<Void>) {
836
0
    configurator(self).whenFailure { error in
837
0
      // We might be on a different EL, but `handleError` will sort that out for us, so no need to
838
0
      // hop.
839
0
      if error is GRPCStatus || error is GRPCStatusTransformable {
840
0
        self.handleError(error)
841
0
      } else {
842
0
        // Fallback to something which will mark the RPC as 'unavailable'.
843
0
        self.handleError(ConnectionFailure(reason: error))
844
0
      }
845
0
    }
846
0
  }
847
848
  /// Append a request part to the write buffer.
849
  /// - Parameters:
850
  ///   - part: The request part to buffer.
851
  ///   - promise: A promise to complete when the request part has been sent.
852
  private func buffer(
853
    _ part: GRPCClientRequestPart<Request>,
854
    promise: EventLoopPromise<Void>?
855
0
  ) {
856
0
    self.callEventLoop.assertInEventLoop()
857
0
    self.logger.trace(
858
0
      "buffering request part",
859
0
      metadata: [
860
0
        "request_part": "\(part.name)",
861
0
        "call_state": self.stateForLogging,
862
0
      ]
863
0
    )
864
0
    self.writeBuffer.append(.init(request: part, promise: promise))
865
0
  }
866
867
  /// Writes any buffered request parts to the `Channel`.
868
0
  private func unbuffer() {
869
0
    self.callEventLoop.assertInEventLoop()
870
0
871
0
    guard let channel = self.channel else {
872
0
      return
873
0
    }
874
0
875
0
    // Save any flushing until we're done writing.
876
0
    var shouldFlush = false
877
0
878
0
    self.logger.trace(
879
0
      "unbuffering request parts",
880
0
      metadata: [
881
0
        "request_parts": "\(self.writeBuffer.count)"
882
0
      ]
883
0
    )
884
0
885
0
    // Why the double loop? A promise completed as a result of the flush may enqueue more writes,
886
0
    // or causes us to change state (i.e. we may have to close). If we didn't loop around then we
887
0
    // may miss more buffered writes.
888
0
    while self.state.isUnbuffering, !self.writeBuffer.isEmpty {
889
0
      // Pull out as many writes as possible.
890
0
      while let write = self.writeBuffer.popFirst() {
891
0
        self.logger.trace(
892
0
          "unbuffering request part",
893
0
          metadata: [
894
0
            "request_part": "\(write.request.name)"
895
0
          ]
896
0
        )
897
0
898
0
        if !shouldFlush {
899
0
          shouldFlush = self.shouldFlush(after: write.request)
900
0
        }
901
0
902
0
        self.writeToChannel(channel, part: write.request, promise: write.promise, flush: false)
903
0
      }
904
0
905
0
      // Okay, flush now.
906
0
      if shouldFlush {
907
0
        shouldFlush = false
908
0
        channel.flush()
909
0
      }
910
0
    }
911
0
912
0
    if self.writeBuffer.isEmpty {
913
0
      self.logger.trace("request buffer drained")
914
0
    } else {
915
0
      self.logger.notice("unbuffering aborted", metadata: ["call_state": self.stateForLogging])
916
0
    }
917
0
918
0
    // We're unbuffered. What now?
919
0
    switch self.state.unbuffered() {
920
0
    case .doNothing:
921
0
      ()
922
0
    case .succeedChannelPromise:
923
0
      self.channelPromise?.succeed(channel)
924
0
    }
925
0
  }
926
927
  /// Fails any promises that come with buffered writes with `error`.
928
  /// - Parameter error: The `Error` to fail promises with.
929
0
  private func failBufferedWrites(with error: Error) {
930
0
    self.logger.trace("failing buffered writes", metadata: ["call_state": self.stateForLogging])
931
0
932
0
    while let write = self.writeBuffer.popFirst() {
933
0
      write.promise?.fail(error)
934
0
    }
935
0
  }
936
937
  /// Write a request part to the `Channel`.
938
  /// - Parameters:
939
  ///   - channel: The `Channel` to write `part` to.
940
  ///   - part: The request part to write.
941
  ///   - promise: A promise to complete once the write has been completed.
942
  ///   - flush: Whether to flush the `Channel` after writing.
943
  private func writeToChannel(
944
    _ channel: Channel,
945
    part: GRPCClientRequestPart<Request>,
946
    promise: EventLoopPromise<Void>?,
947
    flush: Bool
948
0
  ) {
949
0
    switch part {
950
0
    case let .metadata(headers):
951
0
      let head = self.makeRequestHead(with: headers)
952
0
      channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
953
0
      // Messages are buffered by this class and in the async writer for async calls. Initially the
954
0
      // async writer is not allowed to emit messages; the call to 'onStart()' signals that messages
955
0
      // may be emitted. We call it here to avoid races between writing headers and messages.
956
0
      self.onStart?()
957
0
      self.onStart = nil
958
0
959
0
    case let .message(request, metadata):
960
0
      do {
961
0
        let bytes = try self.serializer.serialize(request, allocator: channel.allocator)
962
0
        let message = _MessageContext<ByteBuffer>(bytes, compressed: metadata.compress)
963
0
        channel.write(self.wrapOutboundOut(.message(message)), promise: promise)
964
0
      } catch {
965
0
        self.handleError(error)
966
0
      }
967
0
968
0
    case .end:
969
0
      channel.write(self.wrapOutboundOut(.end), promise: promise)
970
0
    }
971
0
972
0
    if flush {
973
0
      channel.flush()
974
0
    }
975
0
  }
976
977
  /// Forward the response part to the interceptor pipeline.
978
  /// - Parameter part: The response part to forward.
979
0
  private func forwardToInterceptors(_ part: GRPCClientResponsePart<Response>) {
980
0
    self.callEventLoop.assertInEventLoop()
981
0
    self._pipeline?.receive(part)
982
0
  }
983
984
  /// Forward the error to the interceptor pipeline.
985
  /// - Parameter error: The error to forward.
986
0
  private func forwardErrorToInterceptors(_ error: Error) {
987
0
    self.callEventLoop.assertInEventLoop()
988
0
    self._pipeline?.errorCaught(error)
989
0
  }
990
}
991
992
// MARK: - Helpers
993
994
extension ClientTransport {
995
  /// Returns whether the `Channel` should be flushed after writing the given part to it.
996
0
  private func shouldFlush(after part: GRPCClientRequestPart<Request>) -> Bool {
997
0
    switch part {
998
0
    case .metadata:
999
0
      // If we're not streaming requests then we hold off on the flush until we see end.
1000
0
      return self.isStreamingRequests
1001
0
1002
0
    case let .message(_, metadata):
1003
0
      // Message flushing is determined by caller preference.
1004
0
      return metadata.flush
1005
0
1006
0
    case .end:
1007
0
      // Always flush at the end of the request stream.
1008
0
      return true
1009
0
    }
1010
0
  }
1011
1012
  /// Make a `_GRPCRequestHead` with the provided metadata.
1013
0
  private func makeRequestHead(with metadata: HPACKHeaders) -> _GRPCRequestHead {
1014
0
    return _GRPCRequestHead(
1015
0
      method: self.callDetails.options.cacheable ? "GET" : "POST",
1016
0
      scheme: self.callDetails.scheme,
1017
0
      path: self.callDetails.path,
1018
0
      host: self.callDetails.authority,
1019
0
      deadline: self.callDetails.options.timeLimit.makeDeadline(),
1020
0
      customMetadata: metadata,
1021
0
      encoding: self.callDetails.options.messageEncoding
1022
0
    )
1023
0
  }
1024
}
1025
1026
extension GRPCClientRequestPart {
1027
  /// The name of the request part, used for logging.
1028
0
  fileprivate var name: String {
1029
0
    switch self {
1030
0
    case .metadata:
1031
0
      return "metadata"
1032
0
    case .message:
1033
0
      return "message"
1034
0
    case .end:
1035
0
      return "end"
1036
0
    }
1037
0
  }
1038
}
1039
1040
// A wrapper for connection errors: we need to be able to preserve the underlying error as
1041
// well as extract a 'GRPCStatus' with code '.unavailable'.
1042
internal struct ConnectionFailure: Error, GRPCStatusTransformable, CustomStringConvertible {
1043
  /// The reason the connection failed.
1044
  var reason: Error
1045
1046
0
  init(reason: Error) {
1047
0
    self.reason = reason
1048
0
  }
1049
1050
0
  var description: String {
1051
0
    return String(describing: self.reason)
1052
0
  }
1053
1054
0
  func makeGRPCStatus() -> GRPCStatus {
1055
0
    return GRPCStatus(
1056
0
      code: .unavailable,
1057
0
      message: String(describing: self.reason),
1058
0
      cause: self.reason
1059
0
    )
1060
0
  }
1061
}