Coverage Report

Created: 2025-09-04 06:32

/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2021, gRPC Authors All rights reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import NIOCore
17
import NIOHPACK
18
19
/// Async-await variant of ``BidirectionalStreamingCall``.
20
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
21
public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable>: Sendable {
22
  private let call: Call<Request, Response>
23
  private let responseParts: StreamingResponseParts<Response>
24
  private let responseSource:
25
    NIOThrowingAsyncSequenceProducer<
26
      Response,
27
      Error,
28
      NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
29
      GRPCAsyncSequenceProducerDelegate
30
    >.Source
31
  private let requestSink: AsyncSink<(Request, Compression)>
32
33
  /// A request stream writer for sending messages to the server.
34
  public let requestStream: GRPCAsyncRequestStreamWriter<Request>
35
36
  /// The stream of responses from the server.
37
  public let responseStream: GRPCAsyncResponseStream<Response>
38
39
  /// The options used to make the RPC.
40
0
  public var options: CallOptions {
41
0
    return self.call.options
42
0
  }
43
44
  /// The path used to make the RPC.
45
0
  public var path: String {
46
0
    return self.call.path
47
0
  }
48
49
  /// Cancel this RPC if it hasn't already completed.
50
0
  public func cancel() {
51
0
    self.call.cancel(promise: nil)
52
0
  }
53
54
  // MARK: - Response Parts
55
56
0
  private func withRPCCancellation<R: Sendable>(_ fn: () async throws -> R) async rethrows -> R {
57
0
    return try await withTaskCancellationHandler(operation: fn) {
58
0
      self.cancel()
59
0
    }
60
0
  }
61
62
  /// The initial metadata returned from the server.
63
  ///
64
  /// - Important: The initial metadata will only be available when the first response has been
65
  /// received. However, it is not necessary for the response to have been consumed before reading
66
  /// this property.
67
  public var initialMetadata: HPACKHeaders {
68
0
    get async throws {
69
0
      try await self.withRPCCancellation {
70
0
        try await self.responseParts.initialMetadata.get()
71
0
      }
72
0
    }
73
  }
74
75
  /// The trailing metadata returned from the server.
76
  ///
77
  /// - Important: Awaiting this property will suspend until the responses have been consumed.
78
  public var trailingMetadata: HPACKHeaders {
79
0
    get async throws {
80
0
      try await self.withRPCCancellation {
81
0
        try await self.responseParts.trailingMetadata.get()
82
0
      }
83
0
    }
84
  }
85
86
  /// The final status of the the RPC.
87
  ///
88
  /// - Important: Awaiting this property will suspend until the responses have been consumed.
89
  public var status: GRPCStatus {
90
0
    get async {
91
0
      // force-try acceptable because any error is encapsulated in a successful GRPCStatus future.
92
0
      await self.withRPCCancellation {
93
0
        try! await self.responseParts.status.get()
94
0
      }
95
0
    }
96
  }
97
98
0
  private init(call: Call<Request, Response>) {
99
0
    self.call = call
100
0
    self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
101
0
102
0
    let sequenceProducer = NIOThrowingAsyncSequenceProducer<
103
0
      Response,
104
0
      Error,
105
0
      NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
106
0
      GRPCAsyncSequenceProducerDelegate
107
0
    >.makeSequence(
108
0
      backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
109
0
      delegate: GRPCAsyncSequenceProducerDelegate()
110
0
    )
111
0
112
0
    self.responseSource = sequenceProducer.source
113
0
    self.responseStream = .init(sequenceProducer.sequence)
114
0
    let (requestStream, requestSink) = call.makeRequestStreamWriter()
115
0
    self.requestStream = requestStream
116
0
    self.requestSink = AsyncSink(wrapping: requestSink)
117
0
  }
118
119
  /// We expose this as the only non-private initializer so that the caller
120
  /// knows that invocation is part of initialisation.
121
0
  internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
122
0
    let asyncCall = Self(call: call)
123
0
124
0
    asyncCall.call.invokeStreamingRequests(
125
0
      onStart: {
126
0
        asyncCall.requestSink.setWritability(to: true)
127
0
      },
128
0
      onError: { error in
129
0
        asyncCall.responseParts.handleError(error)
130
0
        asyncCall.responseSource.finish(error)
131
0
        asyncCall.requestSink.finish(error: error)
132
0
      },
133
0
      onResponsePart: AsyncCall.makeResponsePartHandler(
134
0
        responseParts: asyncCall.responseParts,
135
0
        responseSource: asyncCall.responseSource,
136
0
        requestStream: asyncCall.requestStream
137
0
      )
138
0
    )
139
0
140
0
    return asyncCall
141
0
  }
142
}
143
144
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
145
internal enum AsyncCall {
146
  internal static func makeResponsePartHandler<Response, Request>(
147
    responseParts: StreamingResponseParts<Response>,
148
    responseSource: NIOThrowingAsyncSequenceProducer<
149
      Response,
150
      Error,
151
      NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
152
      GRPCAsyncSequenceProducerDelegate
153
    >.Source,
154
    requestStream: GRPCAsyncRequestStreamWriter<Request>?,
155
    requestType: Request.Type = Request.self
156
0
  ) -> (GRPCClientResponsePart<Response>) -> Void {
157
0
    return { responsePart in
158
0
      // Handle the metadata, trailers and status.
159
0
      responseParts.handle(responsePart)
160
0
161
0
      // Handle the response messages and status.
162
0
      switch responsePart {
163
0
      case .metadata:
164
0
        ()
165
0
166
0
      case let .message(response):
167
0
        // TODO: when we support backpressure we will need to stop ignoring the return value.
168
0
        _ = responseSource.yield(response)
169
0
170
0
      case let .end(status, _):
171
0
        if status.isOk {
172
0
          responseSource.finish()
173
0
        } else {
174
0
          responseSource.finish(status)
175
0
        }
176
0
        requestStream?.finish(status)
177
0
      }
178
0
    }
179
0
  }
180
181
  internal static func makeResponsePartHandler<Response, Request>(
182
    responseParts: UnaryResponseParts<Response>,
183
    requestStream: GRPCAsyncRequestStreamWriter<Request>?,
184
    requestType: Request.Type = Request.self,
185
    responseType: Response.Type = Response.self
186
0
  ) -> (GRPCClientResponsePart<Response>) -> Void {
187
0
    return { responsePart in
188
0
      // Handle (most of) all parts.
189
0
      responseParts.handle(responsePart)
190
0
191
0
      // Handle the status.
192
0
      switch responsePart {
193
0
      case .metadata, .message:
194
0
        ()
195
0
      case let .end(status, _):
196
0
        requestStream?.finish(status)
197
0
      }
198
0
    }
199
0
  }
200
}