/src/grpc-swift/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2021, gRPC Authors All rights reserved. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | import NIOCore |
17 | | import NIOHPACK |
18 | | |
19 | | /// Async-await variant of ``BidirectionalStreamingCall``. |
20 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
21 | | public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable>: Sendable { |
22 | | private let call: Call<Request, Response> |
23 | | private let responseParts: StreamingResponseParts<Response> |
24 | | private let responseSource: |
25 | | NIOThrowingAsyncSequenceProducer< |
26 | | Response, |
27 | | Error, |
28 | | NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, |
29 | | GRPCAsyncSequenceProducerDelegate |
30 | | >.Source |
31 | | private let requestSink: AsyncSink<(Request, Compression)> |
32 | | |
33 | | /// A request stream writer for sending messages to the server. |
34 | | public let requestStream: GRPCAsyncRequestStreamWriter<Request> |
35 | | |
36 | | /// The stream of responses from the server. |
37 | | public let responseStream: GRPCAsyncResponseStream<Response> |
38 | | |
39 | | /// The options used to make the RPC. |
40 | 0 | public var options: CallOptions { |
41 | 0 | return self.call.options |
42 | 0 | } |
43 | | |
44 | | /// The path used to make the RPC. |
45 | 0 | public var path: String { |
46 | 0 | return self.call.path |
47 | 0 | } |
48 | | |
49 | | /// Cancel this RPC if it hasn't already completed. |
50 | 0 | public func cancel() { |
51 | 0 | self.call.cancel(promise: nil) |
52 | 0 | } |
53 | | |
54 | | // MARK: - Response Parts |
55 | | |
56 | 0 | private func withRPCCancellation<R: Sendable>(_ fn: () async throws -> R) async rethrows -> R { |
57 | 0 | return try await withTaskCancellationHandler(operation: fn) { |
58 | 0 | self.cancel() |
59 | 0 | } |
60 | 0 | } |
61 | | |
62 | | /// The initial metadata returned from the server. |
63 | | /// |
64 | | /// - Important: The initial metadata will only be available when the first response has been |
65 | | /// received. However, it is not necessary for the response to have been consumed before reading |
66 | | /// this property. |
67 | | public var initialMetadata: HPACKHeaders { |
68 | 0 | get async throws { |
69 | 0 | try await self.withRPCCancellation { |
70 | 0 | try await self.responseParts.initialMetadata.get() |
71 | 0 | } |
72 | 0 | } |
73 | | } |
74 | | |
75 | | /// The trailing metadata returned from the server. |
76 | | /// |
77 | | /// - Important: Awaiting this property will suspend until the responses have been consumed. |
78 | | public var trailingMetadata: HPACKHeaders { |
79 | 0 | get async throws { |
80 | 0 | try await self.withRPCCancellation { |
81 | 0 | try await self.responseParts.trailingMetadata.get() |
82 | 0 | } |
83 | 0 | } |
84 | | } |
85 | | |
86 | | /// The final status of the the RPC. |
87 | | /// |
88 | | /// - Important: Awaiting this property will suspend until the responses have been consumed. |
89 | | public var status: GRPCStatus { |
90 | 0 | get async { |
91 | 0 | // force-try acceptable because any error is encapsulated in a successful GRPCStatus future. |
92 | 0 | await self.withRPCCancellation { |
93 | 0 | try! await self.responseParts.status.get() |
94 | 0 | } |
95 | 0 | } |
96 | | } |
97 | | |
98 | 0 | private init(call: Call<Request, Response>) { |
99 | 0 | self.call = call |
100 | 0 | self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in } |
101 | 0 |
|
102 | 0 | let sequenceProducer = NIOThrowingAsyncSequenceProducer< |
103 | 0 | Response, |
104 | 0 | Error, |
105 | 0 | NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, |
106 | 0 | GRPCAsyncSequenceProducerDelegate |
107 | 0 | >.makeSequence( |
108 | 0 | backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50), |
109 | 0 | delegate: GRPCAsyncSequenceProducerDelegate() |
110 | 0 | ) |
111 | 0 |
|
112 | 0 | self.responseSource = sequenceProducer.source |
113 | 0 | self.responseStream = .init(sequenceProducer.sequence) |
114 | 0 | let (requestStream, requestSink) = call.makeRequestStreamWriter() |
115 | 0 | self.requestStream = requestStream |
116 | 0 | self.requestSink = AsyncSink(wrapping: requestSink) |
117 | 0 | } |
118 | | |
119 | | /// We expose this as the only non-private initializer so that the caller |
120 | | /// knows that invocation is part of initialisation. |
121 | 0 | internal static func makeAndInvoke(call: Call<Request, Response>) -> Self { |
122 | 0 | let asyncCall = Self(call: call) |
123 | 0 |
|
124 | 0 | asyncCall.call.invokeStreamingRequests( |
125 | 0 | onStart: { |
126 | 0 | asyncCall.requestSink.setWritability(to: true) |
127 | 0 | }, |
128 | 0 | onError: { error in |
129 | 0 | asyncCall.responseParts.handleError(error) |
130 | 0 | asyncCall.responseSource.finish(error) |
131 | 0 | asyncCall.requestSink.finish(error: error) |
132 | 0 | }, |
133 | 0 | onResponsePart: AsyncCall.makeResponsePartHandler( |
134 | 0 | responseParts: asyncCall.responseParts, |
135 | 0 | responseSource: asyncCall.responseSource, |
136 | 0 | requestStream: asyncCall.requestStream |
137 | 0 | ) |
138 | 0 | ) |
139 | 0 |
|
140 | 0 | return asyncCall |
141 | 0 | } |
142 | | } |
143 | | |
144 | | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) |
145 | | internal enum AsyncCall { |
146 | | internal static func makeResponsePartHandler<Response, Request>( |
147 | | responseParts: StreamingResponseParts<Response>, |
148 | | responseSource: NIOThrowingAsyncSequenceProducer< |
149 | | Response, |
150 | | Error, |
151 | | NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, |
152 | | GRPCAsyncSequenceProducerDelegate |
153 | | >.Source, |
154 | | requestStream: GRPCAsyncRequestStreamWriter<Request>?, |
155 | | requestType: Request.Type = Request.self |
156 | 0 | ) -> (GRPCClientResponsePart<Response>) -> Void { |
157 | 0 | return { responsePart in |
158 | 0 | // Handle the metadata, trailers and status. |
159 | 0 | responseParts.handle(responsePart) |
160 | 0 |
|
161 | 0 | // Handle the response messages and status. |
162 | 0 | switch responsePart { |
163 | 0 | case .metadata: |
164 | 0 | () |
165 | 0 |
|
166 | 0 | case let .message(response): |
167 | 0 | // TODO: when we support backpressure we will need to stop ignoring the return value. |
168 | 0 | _ = responseSource.yield(response) |
169 | 0 |
|
170 | 0 | case let .end(status, _): |
171 | 0 | if status.isOk { |
172 | 0 | responseSource.finish() |
173 | 0 | } else { |
174 | 0 | responseSource.finish(status) |
175 | 0 | } |
176 | 0 | requestStream?.finish(status) |
177 | 0 | } |
178 | 0 | } |
179 | 0 | } |
180 | | |
181 | | internal static func makeResponsePartHandler<Response, Request>( |
182 | | responseParts: UnaryResponseParts<Response>, |
183 | | requestStream: GRPCAsyncRequestStreamWriter<Request>?, |
184 | | requestType: Request.Type = Request.self, |
185 | | responseType: Response.Type = Response.self |
186 | 0 | ) -> (GRPCClientResponsePart<Response>) -> Void { |
187 | 0 | return { responsePart in |
188 | 0 | // Handle (most of) all parts. |
189 | 0 | responseParts.handle(responsePart) |
190 | 0 |
|
191 | 0 | // Handle the status. |
192 | 0 | switch responsePart { |
193 | 0 | case .metadata, .message: |
194 | 0 | () |
195 | 0 | case let .end(status, _): |
196 | 0 | requestStream?.finish(status) |
197 | 0 | } |
198 | 0 | } |
199 | 0 | } |
200 | | } |