Coverage Report

Created: 2026-03-11 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/grpc-swift/Sources/GRPC/ClientCalls/Call.swift
Line
Count
Source
1
/*
2
 * Copyright 2020, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import Logging
17
import NIOCore
18
import NIOHPACK
19
import NIOHTTP2
20
21
import protocol SwiftProtobuf.Message
22
23
/// An object representing a single RPC from the perspective of a client. It allows the caller to
24
/// send request parts, request a cancellation, and receive response parts in a provided callback.
25
///
26
/// The call object sits atop an interceptor pipeline (see ``ClientInterceptor``) which allows for
27
/// request and response streams to be arbitrarily transformed or observed. Requests sent via this
28
/// call will traverse the pipeline before reaching the network, and responses received will
29
/// traverse the pipeline having been received from the network.
30
///
31
/// This object is a lower-level API than the equivalent wrapped calls (such as ``UnaryCall`` and
32
/// ``BidirectionalStreamingCall``). The caller is therefore required to do more in order to use this
33
/// object correctly. Callers must call ``invoke(onError:onResponsePart:)`` to start the call and ensure that the correct
34
/// number of request parts are sent in the correct order (exactly one `metadata`, followed
35
/// by at most one `message` for unary and server streaming calls, and any number of `message` parts
36
/// for client streaming and bidirectional streaming calls. All call types must terminate their
37
/// request stream by sending one `end` message.
38
///
39
/// Callers are not able to create ``Call`` objects directly, rather they must be created via an
40
/// object conforming to ``GRPCChannel`` such as ``ClientConnection``.
41
public final class Call<Request, Response> {
42
  @usableFromInline
43
  internal enum State {
44
    /// Idle, waiting to be invoked.
45
    case idle(ClientTransportFactory<Request, Response>)
46
47
    /// Invoked, we have a transport on which to send requests. The transport may be closed if the
48
    /// RPC has already completed.
49
    case invoked(ClientTransport<Request, Response>)
50
  }
51
52
  /// The current state of the call.
53
  @usableFromInline
54
  internal var _state: State
55
56
  /// User provided interceptors for the call.
57
  @usableFromInline
58
  internal let _interceptors: [ClientInterceptor<Request, Response>]
59
60
  /// Whether compression is enabled on the call.
61
0
  private var isCompressionEnabled: Bool {
62
0
    return self.options.messageEncoding.enabledForRequests
63
0
  }
64
65
  /// The `EventLoop` the call is being invoked on.
66
  public let eventLoop: EventLoop
67
68
  /// The path of the RPC, usually generated from a service definition, e.g. "/echo.Echo/Get".
69
  public let path: String
70
71
  /// The type of the RPC, e.g. unary, bidirectional streaming.
72
  public let type: GRPCCallType
73
74
  /// Options used to invoke the call.
75
  public let options: CallOptions
76
77
  /// A promise for the underlying `Channel`. We only allocate this if the user asks for
78
  /// the `Channel` and we haven't invoked the transport yet. It's a bit unfortunate.
79
  private var channelPromise: EventLoopPromise<Channel>?
80
81
  /// Returns a future for the underlying `Channel`.
82
0
  internal var channel: EventLoopFuture<Channel> {
83
0
    if self.eventLoop.inEventLoop {
84
0
      return self._channel()
85
0
    } else {
86
0
      return self.eventLoop.flatSubmit {
87
0
        return self._channel()
88
0
      }
89
0
    }
90
0
  }
91
92
  // Calls can't be constructed directly: users must make them using a `GRPCChannel`.
93
  @inlinable
94
  internal init(
95
    path: String,
96
    type: GRPCCallType,
97
    eventLoop: EventLoop,
98
    options: CallOptions,
99
    interceptors: [ClientInterceptor<Request, Response>],
100
    transportFactory: ClientTransportFactory<Request, Response>
101
0
  ) {
102
0
    self.path = path
103
0
    self.type = type
104
0
    self.options = options
105
0
    self._state = .idle(transportFactory)
106
0
    self.eventLoop = eventLoop
107
0
    self._interceptors = interceptors
108
0
  }
109
110
  /// Starts the call and provides a callback which is invoked on every response part received from
111
  /// the server.
112
  ///
113
  /// This must be called prior to ``send(_:)`` or ``cancel()``.
114
  ///
115
  /// - Parameters:
116
  ///   - onError: A callback invoked when an error is received.
117
  ///   - onResponsePart: A callback which is invoked on every response part.
118
  /// - Important: This function should only be called once. Subsequent calls will be ignored.
119
  @inlinable
120
  public func invoke(
121
    onError: @escaping (Error) -> Void,
122
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
123
0
  ) {
124
0
    self.options.logger.debug("starting rpc", metadata: ["path": "\(self.path)"], source: "GRPC")
125
0
126
0
    if self.eventLoop.inEventLoop {
127
0
      self._invoke(onStart: {}, onError: onError, onResponsePart: onResponsePart)
128
0
    } else {
129
0
      self.eventLoop.execute {
130
0
        self._invoke(onStart: {}, onError: onError, onResponsePart: onResponsePart)
131
0
      }
132
0
    }
133
0
  }
134
135
  /// Send a request part on the RPC.
136
  /// - Parameters:
137
  ///   - part: The request part to send.
138
  ///   - promise: A promise which will be completed when the request part has been handled.
139
  /// - Note: Sending will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
140
  @inlinable
141
0
  public func send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
142
0
    if self.eventLoop.inEventLoop {
143
0
      self._send(part, promise: promise)
144
0
    } else {
145
0
      self.eventLoop.execute {
146
0
        self._send(part, promise: promise)
147
0
      }
148
0
    }
149
0
  }
150
151
  /// Attempt to cancel the RPC.
152
  /// - Parameter promise: A promise which will be completed once the cancellation request has been
153
  ///   dealt with.
154
  /// - Note: Cancellation will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
155
0
  public func cancel(promise: EventLoopPromise<Void>?) {
156
0
    if self.eventLoop.inEventLoop {
157
0
      self._cancel(promise: promise)
158
0
    } else {
159
0
      self.eventLoop.execute {
160
0
        self._cancel(promise: promise)
161
0
      }
162
0
    }
163
0
  }
164
}
165
166
extension Call {
167
  /// Send a request part on the RPC.
168
  /// - Parameter part: The request part to send.
169
  /// - Returns: A future which will be resolved when the request has been handled.
170
  /// - Note: Sending will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
171
  @inlinable
172
0
  public func send(_ part: GRPCClientRequestPart<Request>) -> EventLoopFuture<Void> {
173
0
    let promise = self.eventLoop.makePromise(of: Void.self)
174
0
    self.send(part, promise: promise)
175
0
    return promise.futureResult
176
0
  }
177
178
  /// Attempt to cancel the RPC.
179
  /// - Note: Cancellation will always fail if ``invoke(onError:onResponsePart:)`` has not been called.
180
  /// - Returns: A future which will be resolved when the cancellation request has been cancelled.
181
0
  public func cancel() -> EventLoopFuture<Void> {
182
0
    let promise = self.eventLoop.makePromise(of: Void.self)
183
0
    self.cancel(promise: promise)
184
0
    return promise.futureResult
185
0
  }
186
}
187
188
extension Call {
189
0
  internal func compress(_ compression: Compression) -> Bool {
190
0
    return compression.isEnabled(callDefault: self.isCompressionEnabled)
191
0
  }
192
193
  internal func sendMessages<Messages>(
194
    _ messages: Messages,
195
    compression: Compression,
196
    promise: EventLoopPromise<Void>?
197
0
  ) where Messages: Sequence, Messages.Element == Request {
198
0
    if self.eventLoop.inEventLoop {
199
0
      if let promise = promise {
200
0
        self._sendMessages(messages, compression: compression, promise: promise)
201
0
      } else {
202
0
        self._sendMessages(messages, compression: compression)
203
0
      }
204
0
    } else {
205
0
      self.eventLoop.execute {
206
0
        if let promise = promise {
207
0
          self._sendMessages(messages, compression: compression, promise: promise)
208
0
        } else {
209
0
          self._sendMessages(messages, compression: compression)
210
0
        }
211
0
      }
212
0
    }
213
0
  }
214
215
  // Provide a few convenience methods we need from the wrapped call objects.
216
  private func _sendMessages<Messages>(
217
    _ messages: Messages,
218
    compression: Compression
219
0
  ) where Messages: Sequence, Messages.Element == Request {
220
0
    self.eventLoop.assertInEventLoop()
221
0
    let compress = self.compress(compression)
222
0
223
0
    var iterator = messages.makeIterator()
224
0
    var maybeNext = iterator.next()
225
0
    while let current = maybeNext {
226
0
      let next = iterator.next()
227
0
      // If there's no next message, then we'll flush.
228
0
      let flush = next == nil
229
0
      self._send(.message(current, .init(compress: compress, flush: flush)), promise: nil)
230
0
      maybeNext = next
231
0
    }
232
0
  }
233
234
  private func _sendMessages<Messages>(
235
    _ messages: Messages,
236
    compression: Compression,
237
    promise: EventLoopPromise<Void>
238
0
  ) where Messages: Sequence, Messages.Element == Request {
239
0
    self.eventLoop.assertInEventLoop()
240
0
    let compress = self.compress(compression)
241
0
242
0
    var iterator = messages.makeIterator()
243
0
    var maybeNext = iterator.next()
244
0
    while let current = maybeNext {
245
0
      let next = iterator.next()
246
0
      let isLast = next == nil
247
0
248
0
      // We're already on the event loop, use the `_` send.
249
0
      if isLast {
250
0
        // Only flush and attach the promise to the last message.
251
0
        self._send(.message(current, .init(compress: compress, flush: true)), promise: promise)
252
0
      } else {
253
0
        self._send(.message(current, .init(compress: compress, flush: false)), promise: nil)
254
0
      }
255
0
256
0
      maybeNext = next
257
0
    }
258
0
  }
259
}
260
261
extension Call {
262
  /// Invoke the RPC with this response part handler.
263
  /// - Important: This *must* to be called from the `eventLoop`.
264
  @usableFromInline
265
  internal func _invoke(
266
    onStart: @escaping () -> Void,
267
    onError: @escaping (Error) -> Void,
268
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
269
0
  ) {
270
0
    self.eventLoop.assertInEventLoop()
271
0
272
0
    switch self._state {
273
0
    case let .idle(factory):
274
0
      let transport = factory.makeConfiguredTransport(
275
0
        to: self.path,
276
0
        for: self.type,
277
0
        withOptions: self.options,
278
0
        onEventLoop: self.eventLoop,
279
0
        interceptedBy: self._interceptors,
280
0
        onStart: onStart,
281
0
        onError: onError,
282
0
        onResponsePart: onResponsePart
283
0
      )
284
0
      self._state = .invoked(transport)
285
0
286
0
    case .invoked:
287
0
      // We can't be invoked twice. Just ignore this.
288
0
      ()
289
0
    }
290
0
  }
291
292
  /// Send a request part on the transport.
293
  /// - Important: This *must* to be called from the `eventLoop`.
294
  @inlinable
295
0
  internal func _send(_ part: GRPCClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
296
0
    self.eventLoop.assertInEventLoop()
297
0
298
0
    switch self._state {
299
0
    case .idle:
300
0
      promise?.fail(GRPCError.InvalidState("Call must be invoked before sending request parts"))
301
0
302
0
    case let .invoked(transport):
303
0
      transport.send(part, promise: promise)
304
0
    }
305
0
  }
306
307
  /// Attempt to cancel the call.
308
  /// - Important: This *must* to be called from the `eventLoop`.
309
0
  private func _cancel(promise: EventLoopPromise<Void>?) {
310
0
    self.eventLoop.assertInEventLoop()
311
0
312
0
    switch self._state {
313
0
    case .idle:
314
0
      promise?.succeed(())
315
0
      self.channelPromise?.fail(GRPCStatus(code: .cancelled))
316
0
317
0
    case let .invoked(transport):
318
0
      transport.cancel(promise: promise)
319
0
    }
320
0
  }
321
322
  /// Get the underlying `Channel` for this call.
323
  /// - Important: This *must* to be called from the `eventLoop`.
324
0
  private func _channel() -> EventLoopFuture<Channel> {
325
0
    self.eventLoop.assertInEventLoop()
326
0
327
0
    switch (self.channelPromise, self._state) {
328
0
    case let (.some(promise), .idle),
329
0
      let (.some(promise), .invoked):
330
0
      // We already have a promise, just use that.
331
0
      return promise.futureResult
332
0
333
0
    case (.none, .idle):
334
0
      // We need to allocate a promise and ask the transport for the channel later.
335
0
      let promise = self.eventLoop.makePromise(of: Channel.self)
336
0
      self.channelPromise = promise
337
0
      return promise.futureResult
338
0
339
0
    case let (.none, .invoked(transport)):
340
0
      // Just ask the transport.
341
0
      return transport.getChannel()
342
0
    }
343
0
  }
344
}
345
346
extension Call {
347
  // These helpers are for our wrapping call objects (`UnaryCall`, etc.).
348
349
  /// Invokes the call and sends a single request. Sends the metadata, request and closes the
350
  /// request stream.
351
  /// - Parameters:
352
  ///   - request: The request to send.
353
  ///   - onError: A callback invoked when an error is received.
354
  ///   - onResponsePart: A callback invoked for each response part received.
355
  @inlinable
356
  internal func invokeUnaryRequest(
357
    _ request: Request,
358
    onStart: @escaping () -> Void,
359
    onError: @escaping (Error) -> Void,
360
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
361
0
  ) {
362
0
    if self.eventLoop.inEventLoop {
363
0
      self._invokeUnaryRequest(
364
0
        request: request,
365
0
        onStart: onStart,
366
0
        onError: onError,
367
0
        onResponsePart: onResponsePart
368
0
      )
369
0
    } else {
370
0
      self.eventLoop.execute {
371
0
        self._invokeUnaryRequest(
372
0
          request: request,
373
0
          onStart: onStart,
374
0
          onError: onError,
375
0
          onResponsePart: onResponsePart
376
0
        )
377
0
      }
378
0
    }
379
0
  }
380
381
  /// Invokes the call for streaming requests and sends the initial call metadata. Callers can send
382
  /// additional messages and end the stream by calling `send(_:promise:)`.
383
  /// - Parameters:
384
  ///   - onError: A callback invoked when an error is received.
385
  ///   - onResponsePart: A callback invoked for each response part received.
386
  @inlinable
387
  internal func invokeStreamingRequests(
388
    onStart: @escaping () -> Void,
389
    onError: @escaping (Error) -> Void,
390
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
391
0
  ) {
392
0
    if self.eventLoop.inEventLoop {
393
0
      self._invokeStreamingRequests(
394
0
        onStart: onStart,
395
0
        onError: onError,
396
0
        onResponsePart: onResponsePart
397
0
      )
398
0
    } else {
399
0
      self.eventLoop.execute {
400
0
        self._invokeStreamingRequests(
401
0
          onStart: onStart,
402
0
          onError: onError,
403
0
          onResponsePart: onResponsePart
404
0
        )
405
0
      }
406
0
    }
407
0
  }
408
409
  /// On-`EventLoop` implementation of `invokeUnaryRequest(request:_:)`.
410
  @usableFromInline
411
  internal func _invokeUnaryRequest(
412
    request: Request,
413
    onStart: @escaping () -> Void,
414
    onError: @escaping (Error) -> Void,
415
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
416
0
  ) {
417
0
    self.eventLoop.assertInEventLoop()
418
0
    assert(self.type == .unary || self.type == .serverStreaming)
419
0
420
0
    self._invoke(onStart: onStart, onError: onError, onResponsePart: onResponsePart)
421
0
    self._send(.metadata(self.options.customMetadata), promise: nil)
422
0
    self._send(
423
0
      .message(request, .init(compress: self.isCompressionEnabled, flush: false)),
424
0
      promise: nil
425
0
    )
426
0
    self._send(.end, promise: nil)
427
0
  }
428
429
  /// On-`EventLoop` implementation of `invokeStreamingRequests(_:)`.
430
  @usableFromInline
431
  internal func _invokeStreamingRequests(
432
    onStart: @escaping () -> Void,
433
    onError: @escaping (Error) -> Void,
434
    onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
435
0
  ) {
436
0
    self.eventLoop.assertInEventLoop()
437
0
    assert(self.type == .clientStreaming || self.type == .bidirectionalStreaming)
438
0
439
0
    self._invoke(onStart: onStart, onError: onError, onResponsePart: onResponsePart)
440
0
    self._send(.metadata(self.options.customMetadata), promise: nil)
441
0
  }
442
}
443
444
// @unchecked is ok: all mutable state is accessed/modified from the appropriate event loop.
445
extension Call: @unchecked Sendable where Request: Sendable, Response: Sendable {}